Python API
Kuzu provides a Python package that you can install via PyPI. A full list of the available functions and classes can be found in the Python API documentation, linked below.
Sync and Async APIs
Kuzu provides both a synchronous and an asynchronous Python API. The synchronous API is the default and is more convenient to use.
The synchronous API is the default and is a common way to work with Kuzu in Python.
import kuzu
def main() -> None: # Create an empty on-disk database and connect to it db = kuzu.Database("./demo_db") conn = kuzu.Connection(db)
# Create schema conn.execute("CREATE NODE TABLE User(name STRING, age INT64, PRIMARY KEY (name))") conn.execute("CREATE NODE TABLE City(name STRING, population INT64, PRIMARY KEY (name))") conn.execute("CREATE REL TABLE Follows(FROM User TO User, since INT64)") conn.execute("CREATE REL TABLE LivesIn(FROM User TO City)")
# Insert data conn.execute('COPY User FROM "./data/user.csv"') conn.execute('COPY City FROM "./data/city.csv"') conn.execute('COPY Follows FROM "./data/follows.csv"') conn.execute('COPY LivesIn FROM "./data/lives-in.csv"')
# Execute Cypher query response = conn.execute( """ MATCH (a:User)-[f:Follows]->(b:User) RETURN a.name, b.name, f.since; """ ) while response.has_next(): print(response.get_next())
The asynchronous API is useful for running Kuzu in an async context, such as in web frameworks like FastAPI or cases where you need to concurrently run queries in Kuzu.
import asyncioimport shutil
import kuzu
shutil.rmtree("test_db", ignore_errors=True)db = kuzu.Database("test_db")# Create the async connection# The undelying connection pool will be automatically created and managed by the async connectionconn = kuzu.AsyncConnection(db, max_concurrent_queries=4)
async def create_tables(conn: kuzu.AsyncConnection) -> None: """Create the node and relationship tables""" await conn.execute("CREATE NODE TABLE User(name STRING PRIMARY KEY, age INT64)") await conn.execute("CREATE NODE TABLE City(name STRING PRIMARY KEY, population INT64)") await conn.execute("CREATE REL TABLE Follows(FROM User TO User, since INT64)") await conn.execute("CREATE REL TABLE LivesIn(FROM User TO City)")
async def copy_data(conn: kuzu.AsyncConnection) -> None: """Copy the data from the CSV files to the node and relationship tables""" await conn.execute("COPY User FROM 'example_data/user.csv' (header=true)") await conn.execute("COPY City FROM 'example_data/city.csv'") await conn.execute("COPY Follows FROM 'example_data/follows.csv'") await conn.execute("COPY LivesIn FROM 'example_data/lives-in.csv'")
async def query_1(conn: kuzu.AsyncConnection) -> None: result = await conn.execute("MATCH (u:User)-[:LivesIn]->(c:City) RETURN u.*") while result.has_next(): print(result.get_next())
async def main(): await create_tables(conn) await copy_data(conn) # Run queries await query_1(conn)
if __name__ == "__main__": asyncio.run(main())
The async API in Python is backed by a thread pool. The thread pool is automatically
created and managed by the async connection — all you need to do is pass in the max_concurrent_queries
parameter to the async connection constructor.
Run multiple queries in one execution
By default, executing a single query in the Python API will return a QueryResult
object. However,
if you submit multiple Cypher queries separated by semicolons in a single execution, the API will
return a list of QueryResult
objects.
You can loop through each result of a QueryResult
object and get its contents.
response = conn.execute("RETURN 1; RETURN 2; RETURN 3")for row in response: while row.has_next(): print(row.get_next())
This returns:
[1][2][3]
DataFrames and Arrow Tables
In Python, Kuzu supports the use of Pandas and Polars DataFrames, as well as PyArrow Tables. This allows you to leverage the data manipulation capabilities of these libraries in your graph workflows.
Output query results
You can output the results of a Cypher query to a Pandas DataFrame, Polars DataFrame, or PyArrow Table. The following examples show how to output query results to each of these data structures.
You can output the results of a Cypher query to a Pandas DataFrame using the get_as_df()
method:
import kuzuimport pandas as pd
db = kuzu.Database("tmp")conn = kuzu.Connection(db)
conn.execute("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY (name))")conn.execute("CREATE (a:Person {name: 'Adam', age: 30})")conn.execute("CREATE (a:Person {name: 'Karissa', age: 40})")conn.execute("CREATE (a:Person {name: 'Zhang', age: 50})")
result = conn.execute("MATCH (p:Person) RETURN p.*")print(result.get_as_df())
You can return all the columns of a node table by using the *
wildcard in the RETURN
clause.
p.name p.age0 Adam 301 Karissa 402 Zhang 50
Return specific columns by name and optionally, alias them, as follows:
result = conn.execute("MATCH (p:Person) RETURN p.name AS name")print(result.get_as_df())
This will return only the name
column.
name0 Adam1 Karissa2 Zhang
You can output the results of a Cypher query to a Polars DataFrame using the get_as_pl()
method:
import kuzuimport polars as pl
db = kuzu.Database("tmp")conn = kuzu.Connection(db)
conn.execute("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY (name))")conn.execute("CREATE (a:Person {name: 'Adam', age: 30})")conn.execute("CREATE (a:Person {name: 'Karissa', age: 40})")conn.execute("CREATE (a:Person {name: 'Zhang', age: 50})")
result = conn.execute("MATCH (p:Person) RETURN p.*")print(result.get_as_pl())
Using the get_as_pl()
method on your query result returns the result as a Polars DataFrame.
shape: (3, 2)┌─────────┬───────┐│ p.name ┆ p.age ││ --- ┆ --- ││ str ┆ i64 │╞═════════╪═══════╡│ Adam ┆ 30 ││ Karissa ┆ 40 ││ Zhang ┆ 50 │└─────────┴───────┘
Return specific columns by name and optionally, alias them, as follows:
result = conn.execute("MATCH (p:Person) RETURN p.name AS name")print(result.get_as_pl())
This will return only the name
column.
shape: (3, 1)┌─────────┐│ name ││ --- ││ str │╞═════════╡│ Adam ││ Karissa ││ Zhang │└─────────┘
You can output the results of a Cypher query to a PyArrow Table using the get_as_arrow()
method:
import kuzuimport pyarrow as pa
db = kuzu.Database("tmp")conn = kuzu.Connection(db)
conn.execute("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY (name))")conn.execute("CREATE (a:Person {name: 'Adam', age: 30})")conn.execute("CREATE (a:Person {name: 'Karissa', age: 40})")conn.execute("CREATE (a:Person {name: 'Zhang', age: 50})")
result = conn.execute("MATCH (p:Person) RETURN p.*")print(result.get_as_arrow())
Using the get_as_arrow()
method on your query result returns the result as a PyArrow Table.
pyarrow.Tablep.name: string----p.name: [["Adam","Karissa","Zhang"]]
LOAD FROM
You can scan a Pandas DataFrame, Polars DataFrame, or PyArrow Table in Kuzu using the LOAD FROM
clause.
Scanning a DataFrame or Table does not copy the data into Kuzu, it only reads the data.
import kuzuimport pandas as pd
db = kuzu.Database("tmp")conn = kuzu.Connection(db)
df = pd.DataFrame({ "name": ["Adam", "Karissa", "Zhang"], "age": [30, 40, 50]})
result = conn.execute("LOAD FROM df RETURN *")print(result.get_as_df())
Using the get_as_df()
method on your query result returns the result as a Pandas DataFrame.
name age0 Adam 301 Karissa 402 Zhang 50
import kuzuimport polars as pl
db = kuzu.Database("tmp")conn = kuzu.Connection(db)
df = pl.DataFrame({ "name": ["Adam", "Karissa", "Zhang"], "age": [30, 40, 50]})
result = conn.execute("LOAD FROM df RETURN *")print(result.get_as_pl())
Using the get_as_pl()
method on your query result returns the result as a Polars DataFrame.
shape: (3, 2)┌─────────┬─────┐│ name ┆ age ││ --- ┆ --- ││ str ┆ i64 │╞═════════╪═════╡│ Adam ┆ 30 ││ Karissa ┆ 25 ││ Zhang ┆ 20 │└─────────┴─────┘
import kuzuimport pyarrow as pa
db = kuzu.Database("tmp")conn = kuzu.Connection(db)
tbl = pa.table({ "name": ["Adam", "Karissa", "Zhang"], "age": [30, 40, 50]})
result = conn.execute("LOAD FROM tbl RETURN *")print(result.get_as_arrow())
Using the get_as_arrow()
method on your query result returns the result as a PyArrow Table.
pyarrow.Tablename: stringage: int64----name: [["Adam","Karissa","Zhang"]]age: [[30,40,50]]
COPY FROM
Copy from a Pandas DataFrame into a Kuzu table using the COPY FROM
command:
import kuzuimport pandas as pd
db = kuzu.Database("tmp")conn = kuzu.Connection(db)
conn.execute("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY (name))")
df = pd.DataFrame({ "name": ["Adam", "Karissa", "Zhang"], "age": [30, 40, 50]})
conn.execute("COPY Person FROM df")
result = conn.execute("MATCH (p:Person) RETURN p.*")print(result.get_as_df())
Using the get_as_df()
method on your query result returns the result as a Pandas DataFrame.
p.name p.age0 Adam 301 Karissa 402 Zhang 50
Copy from a Polars DataFrame into a Kuzu table using the COPY FROM
command:
import kuzuimport polars as pl
db = kuzu.Database("tmp")conn = kuzu.Connection(db)
conn.execute("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY (name))")
df = pl.DataFrame({ "name": ["Adam", "Karissa", "Zhang"], "age": [30, 40, 50]})
conn.execute("COPY Person FROM df")
result = conn.execute("MATCH (p:Person) RETURN p.*")print(result.get_as_pl())
Using the get_as_pl()
method on your query result returns the result as a Polars DataFrame.
shape: (3, 2)┌─────────┬───────┐│ p.name ┆ p.age ││ --- ┆ --- ││ str ┆ i64 │╞═════════╪═══════╡│ Adam ┆ 30 ││ Karissa ┆ 40 ││ Zhang ┆ 50 │└─────────┴───────┘
Copy from a PyArrow Table into a Kuzu table using the COPY FROM
command:
import kuzuimport pyarrow as pa
db = kuzu.Database("tmp")conn = kuzu.Connection(db)
conn.execute("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY (name))")
tbl = pa.table({ "name": ["Adam", "Karissa", "Zhang"], "age": [30, 40, 50]})
conn.execute("COPY Person FROM tbl")
result = conn.execute("MATCH (p:Person) RETURN p.*")print(result.get_as_arrow())
Using the get_as_arrow()
method on your query result returns the result as a PyArrow Table.
pyarrow.Tablep.name: stringp.age: int64----p.name: [["Adam","Karissa","Zhang"]]p.age: [[30,40,50]]
Type notation
This section summarizes the type notation used in Kuzu’s Python API. Below is a table from Python
types to a Kuzu LogicalTypeID
, which will be used to infer types via Python type annotations.
Python type | Kuzu LogicalTypeID |
---|---|
bool | BOOL |
int | INT64 |
float | DOUBLE |
str | STRING |
datetime | TIMESTAMP |
date | DATE |
timedelta | INTERVAL |
uuid | UUID |
list | LIST |
dict | MAP |
UDF
Kuzu’s Python API also supports the registration of User Defined Functions (UDFs), allowing you to define custom functions in Python and use them in your Cypher queries. This can allow you to quickly extend Kuzu with new functions you need in your Python applications.
An example of using the UDF API is shown below. We will define a Python UDF that calculates the difference between two numbers, and then apply it in a Cypher query.
Register the UDF
import kuzu
db = kuzu.Database("test_db")conn = kuzu.Connection(db)
# define your functiondef difference(a, b): return a - b
# define the expected type of your parametersparameters = [kuzu.Type.INT64, kuzu.Type.INT64]
# define expected type of the returned valuereturn_type = kuzu.Type.INT64
# register the UDFconn.create_function("difference", difference, parameters, return_type)
Note that in the example above, we explicitly declared the expected types of the parameters and the return value in Kuzu, prior to registering the UDF.
Alternatively, you can simply use Python type annotations to denote the type signature of the parameters and return value.
def difference(a : int, b : int) -> int: return abs(a - b)
conn.create_function("difference", difference)
Additional parameters
The UDF API’s create_function
provides the following additional parameters:
name: str
: The name of the function to be invoked in cypher.udf: Callable[[...], Any]
: The function to be executed.params_type: Optional[list[Type | str]]
: A list whose elements can either bekuzu.Type
orstr
.kuzu.Type
can be used to denote nonnested parameter types, whilestr
can be used to denote both nested and nonnested parameter types. Details on how to denote types are in the type notation section.return_type: Optional[Type | str]
: Either akuzu.Type
enum orstr
. Details on how to denote types are in the type notation section.default_null_handling: Optional[bool]
: True by default. When true, if any one of the inputs is null, function execution is skipped and the output is resolved to null.catch_exceptions: Optional[bool]
: False by default. When true, if the UDF raises an exception, the output is resolved to null. Otherwise the Exception is rethrown.
Apply the UDF
Once the UDF is registered, you can apply it in a Cypher query. First, let’s create some data.
# create a tableconn.execute("CREATE NODE TABLE IF NOT EXISTS Item (id INT64, a INT64, b INT64, c INT64, PRIMARY KEY(id))")
# insert some dataconn.execute("CREATE (i:Item {id: 1}) SET i.a = 134, i.b = 123")conn.execute("CREATE (i:Item {id: 2}) SET i.a = 44, i.b = 29")conn.execute("CREATE (i:Item {id: 3}) SET i.a = 32, i.b = 68")
We’re now ready to apply the UDF in a Cypher query:
# apply the UDF and print the resultsresult = conn.execute("MATCH (i:Item) RETURN i.a AS a, i.b AS b, difference (i.a, i.b) AS difference")print(result.get_as_df())
The output should be:
a b difference0 134 123 111 44 29 152 32 68 -36
Remove UDF
In case you want to remove the UDF, you can call the remove_function
method on the connection object.
# Use existing connection objectconn.remove_function(difference)
Nested and complex types
When working with UDFs, you can also specify nested or complex types, though in this case, there are some differences from the examples shown above. With these additional types, a string representation should be given for the parameters which are then manually casted to the respective Kuzu type.
Some examples of where this is relevant are listed below:
- A list of
INT64
would be"INT64[]"
- A map from a
STRING
to aDOUBLE
would be"MAP(STRING, DOUBLE)"
- A Decimal value with 7 significant figures and 2 decimals would be
"DECIMAL(7, 2)"
Note that it’s also valid to define child types through Python’s type annotations, e.g. list[int]
,
or dict(str, float)
for simple types.
Below, we show an example to calculate the discounted price of an item using a Python UDF.
def calculate_discounted_price(price: float, has_discount: bool) -> float: # Assume 10% discount on all items for simplicity return float(price) * 0.9 if has_discount else price
# define the expected type of the UDF's parametersparameters = ['DECIMAL(7, 2)', kuzu.Type.BOOL]
# define expected type of the UDF's returned valuereturn_type = 'DECIMAL(7, 2)'
# register the UDFconn.create_function( "current_price", calculate_discounted_price, parameters, return_type)
The second parameter is a built-in native type in Kuzu, i.e., kuzu.Type.BOOL
. For the first parameter,
we need to specify a string, i.e. "DECIMAL(7,2)"
that’s then parsed and used by the binder in Kuzu
to map to the internal Decimal representation.