DataFusion UDFs

Apache Arrow DataFusion is an embeddable SQL query engine built on Arrow. Because Marrow and DataFusion share the same Arrow columnar memory format, Mojo compute functions can be registered as DataFusion UDFs with zero data copies — DataFusion passes Arrow arrays directly into the Mojo kernel via the C Data Interface.

Setup

DataFusion is an optional dependency in the examples pixi feature. Install it with:

pixi run -e examples datafusion_udf

Or add it to an existing environment:

import marrow as ma
import pyarrow as pa
from datafusion import SessionContext, udf

Register a Mojo kernel as a UDF

Any Marrow compute function that accepts and returns Arrow arrays can be wrapped as a DataFusion UDF. The wrapper converts PyArrow arrays to Marrow via the C Data Interface, calls the kernel, and converts back.

def mojo_add(a: pa.Array, b: pa.Array) -> pa.Array:
    return pa.array(ma.add(ma.array(a, None), ma.array(b)))

add_udf = udf(mojo_add, [pa.int64(), pa.int64()], pa.int64(), "immutable", name="mojo_add")

Query a table

ctx = SessionContext()
ctx.register_udf(add_udf)

batch = pa.record_batch(
    {
        "price":    pa.array([100, 200, 300, 400, 500], type=pa.int64()),
        "quantity": pa.array([3,   1,   4,   1,   5  ], type=pa.int64()),
    }
)
ctx.register_record_batches("orders", [[batch]])

result = pa.Table.from_batches(
    ctx.sql("SELECT price, quantity, mojo_add(price, quantity) AS total FROM orders").collect()
)
result

Null propagation

Marrow kernels propagate nulls according to the Arrow specification. Rows where either operand is null produce a null result — DataFusion preserves this behaviour transparently.

ctx2 = SessionContext()
ctx2.register_udf(add_udf)

batch2 = pa.record_batch(
    {
        "a": pa.array([1, None, 3, 4   ], type=pa.int64()),
        "b": pa.array([10, 20,  None, 40], type=pa.int64()),
    }
)
ctx2.register_record_batches("t", [[batch2]])

pa.Table.from_batches(
    ctx2.sql("SELECT a, b, mojo_add(a, b) AS result FROM t").collect()
)

How it works

Each call to mojo_add inside a DataFusion query goes through these steps:

  1. DataFusion evaluates the query plan and produces a pyarrow.Array chunk for each argument column.
  2. ma.array(a) imports the array into Marrow via __arrow_c_array__ — a pointer exchange, no copy.
  3. ma.add(..., None) runs the SIMD-vectorised Mojo kernel over the raw Arrow buffer.
  4. pa.array(result) exports the result back to PyArrow via __arrow_c_array__ — again, no copy.

The data layout is identical in both libraries, so steps 2 and 4 are O(1) pointer operations regardless of array length.