Expression Execution

The marrow/expr module provides a lazy expression system for building and executing query plans. Expressions are composed into a logical plan tree, then compiled by a Planner into a pull-based processor hierarchy that streams RecordBatch results.

Building expressions

Column references and literals

col(name) references a column by name. lit(value) wraps a scalar constant.

from marrow.expr import col, lit

age  = col("age")
name = col("name")
threshold = lit(30)

Operator overloads

AnyValue supports standard Python operators, which compose into expression nodes:

Operator Expression node
+, -, *, / Binary(ADD / SUB / MUL / DIV)
>, <, >=, <=, ==, != Binary(GT / LT / GE / LE / EQ / NE)
&, \| Binary(AND / OR)
-expr Unary(NEG)
~expr Unary(NOT)
adults    = col("age") > lit(30)
cheap     = col("price") <= lit(100)
qualified = adults & cheap

if_else

Element-wise conditional: if_else(condition, true_value, false_value).

from marrow.expr import if_else

label = if_else(col("score") >= lit(90), lit("pass"), lit("fail"))

is_null and cast

from marrow.expr import col

missing = col("optional_field").is_null()
as_f64  = col("price").cast(ma.float64())

Relational plan nodes

Plan nodes represent transformations on tabular data. They are composed via chaining.

InMemoryTable

in_memory_table(batch) wraps a RecordBatch as a leaf node.

from marrow.expr import in_memory_table

batch = ma.record_batch(
    [ma.array([25, 35, 45, 20]), ma.array(["Alice", "Bob", "Carol", "Dave"])],
    names=["age", "name"],
)
source = in_memory_table(batch)

ParquetScan

parquet_scan(path) reads a Parquet file lazily — the file is read when the plan is executed, not when the node is constructed.

from marrow.expr import parquet_scan

source = parquet_scan("employees.parquet")

Filter

.filter(predicate) keeps rows where the boolean expression is True. Null rows are excluded.

adults = source.filter(col("age") > lit(30))

Project

.select(*exprs) evaluates expressions and returns a new schema with the selected/computed columns.

projected = adults.select(col("name"), col("age"), (col("age") - lit(18)).alias("years_of_work"))

Note: alias() is not yet implemented in the expression layer — column names in Project follow the expression tree structure.

Executing a plan

ExecutionContext configures execution parameters. execute(plan, ctx) runs the plan and returns a List[RecordBatch].

from marrow.expr import execute, ExecutionContext

ctx = ExecutionContext()
batches = execute(plan, ctx)

ExecutionContext options

Parameter Default Description
morsel_size 1024 Number of rows per output batch

End-to-end example

from marrow.expr import col, lit, in_memory_table, execute, ExecutionContext

# Source data
batch = ma.record_batch(
    [
        ma.array([25, 35, 45, 20, 55]),
        ma.array(["Alice", "Bob", "Carol", "Dave", "Eve"]),
        ma.array([70_000, 90_000, 110_000, 50_000, 130_000]),
    ],
    names=["age", "name", "salary"],
)

# Build a plan: keep employees over 30 with salary > 80k, return name + salary
plan = (
    in_memory_table(batch)
    .filter((col("age") > lit(30)) & (col("salary") > lit(80_000)))
    .select(col("name"), col("salary"))
)

# Execute
ctx = ExecutionContext()
results = execute(plan, ctx)

for rb in results:
    print(rb)

Reading from Parquet

Plans that scan Parquet files work identically. The file is read on demand during execution.

from marrow.expr import col, lit, parquet_scan, execute, ExecutionContext

plan = (
    parquet_scan("employees.parquet")
    .filter(col("age") > lit(30))
    .select(col("name"), col("age"))
)

ctx = ExecutionContext()
results = execute(plan, ctx)