def mojo_add(a: pa.Array, b: pa.Array) -> pa.Array:
return pa.array(ma.add(ma.array(a, None), ma.array(b)))
add_udf = udf(mojo_add, [pa.int64(), pa.int64()], pa.int64(), "immutable", name="mojo_add")DataFusion UDFs
Apache Arrow DataFusion is an embeddable SQL query engine built on Arrow. Because Marrow and DataFusion share the same Arrow columnar memory format, Mojo compute functions can be registered as DataFusion UDFs with zero data copies — DataFusion passes Arrow arrays directly into the Mojo kernel via the C Data Interface.
Setup
DataFusion is an optional dependency in the examples pixi feature. Install it with:
pixi run -e examples datafusion_udfOr add it to an existing environment:
import marrow as ma
import pyarrow as pa
from datafusion import SessionContext, udfRegister a Mojo kernel as a UDF
Any Marrow compute function that accepts and returns Arrow arrays can be wrapped as a DataFusion UDF. The wrapper converts PyArrow arrays to Marrow via the C Data Interface, calls the kernel, and converts back.
Query a table
ctx = SessionContext()
ctx.register_udf(add_udf)
batch = pa.record_batch(
{
"price": pa.array([100, 200, 300, 400, 500], type=pa.int64()),
"quantity": pa.array([3, 1, 4, 1, 5 ], type=pa.int64()),
}
)
ctx.register_record_batches("orders", [[batch]])
result = pa.Table.from_batches(
ctx.sql("SELECT price, quantity, mojo_add(price, quantity) AS total FROM orders").collect()
)
resultNull propagation
Marrow kernels propagate nulls according to the Arrow specification. Rows where either operand is null produce a null result — DataFusion preserves this behaviour transparently.
ctx2 = SessionContext()
ctx2.register_udf(add_udf)
batch2 = pa.record_batch(
{
"a": pa.array([1, None, 3, 4 ], type=pa.int64()),
"b": pa.array([10, 20, None, 40], type=pa.int64()),
}
)
ctx2.register_record_batches("t", [[batch2]])
pa.Table.from_batches(
ctx2.sql("SELECT a, b, mojo_add(a, b) AS result FROM t").collect()
)How it works
Each call to mojo_add inside a DataFusion query goes through these steps:
- DataFusion evaluates the query plan and produces a
pyarrow.Arraychunk for each argument column. ma.array(a)imports the array into Marrow via__arrow_c_array__— a pointer exchange, no copy.ma.add(..., None)runs the SIMD-vectorised Mojo kernel over the raw Arrow buffer.pa.array(result)exports the result back to PyArrow via__arrow_c_array__— again, no copy.
The data layout is identical in both libraries, so steps 2 and 4 are O(1) pointer operations regardless of array length.