Expression Execution
The marrow/expr module provides a lazy expression system for building and executing query plans. Expressions are composed into a logical plan tree, then compiled by a Planner into a pull-based processor hierarchy that streams RecordBatch results.
Building expressions
Column references and literals
col(name) references a column by name. lit(value) wraps a scalar constant.
from marrow.expr import col, lit
age = col("age")
name = col("name")
threshold = lit(30)Operator overloads
AnyValue supports standard Python operators, which compose into expression nodes:
| Operator | Expression node |
|---|---|
+, -, *, / |
Binary(ADD / SUB / MUL / DIV) |
>, <, >=, <=, ==, != |
Binary(GT / LT / GE / LE / EQ / NE) |
&, \| |
Binary(AND / OR) |
-expr |
Unary(NEG) |
~expr |
Unary(NOT) |
adults = col("age") > lit(30)
cheap = col("price") <= lit(100)
qualified = adults & cheapif_else
Element-wise conditional: if_else(condition, true_value, false_value).
from marrow.expr import if_else
label = if_else(col("score") >= lit(90), lit("pass"), lit("fail"))is_null and cast
from marrow.expr import col
missing = col("optional_field").is_null()
as_f64 = col("price").cast(ma.float64())Relational plan nodes
Plan nodes represent transformations on tabular data. They are composed via chaining.
InMemoryTable
in_memory_table(batch) wraps a RecordBatch as a leaf node.
from marrow.expr import in_memory_table
batch = ma.record_batch(
[ma.array([25, 35, 45, 20]), ma.array(["Alice", "Bob", "Carol", "Dave"])],
names=["age", "name"],
)
source = in_memory_table(batch)ParquetScan
parquet_scan(path) reads a Parquet file lazily — the file is read when the plan is executed, not when the node is constructed.
from marrow.expr import parquet_scan
source = parquet_scan("employees.parquet")Filter
.filter(predicate) keeps rows where the boolean expression is True. Null rows are excluded.
adults = source.filter(col("age") > lit(30))Project
.select(*exprs) evaluates expressions and returns a new schema with the selected/computed columns.
projected = adults.select(col("name"), col("age"), (col("age") - lit(18)).alias("years_of_work"))Note: alias() is not yet implemented in the expression layer — column names in Project follow the expression tree structure.
Executing a plan
ExecutionContext configures execution parameters. execute(plan, ctx) runs the plan and returns a List[RecordBatch].
from marrow.expr import execute, ExecutionContext
ctx = ExecutionContext()
batches = execute(plan, ctx)ExecutionContext options
| Parameter | Default | Description |
|---|---|---|
morsel_size |
1024 | Number of rows per output batch |
End-to-end example
from marrow.expr import col, lit, in_memory_table, execute, ExecutionContext
# Source data
batch = ma.record_batch(
[
ma.array([25, 35, 45, 20, 55]),
ma.array(["Alice", "Bob", "Carol", "Dave", "Eve"]),
ma.array([70_000, 90_000, 110_000, 50_000, 130_000]),
],
names=["age", "name", "salary"],
)
# Build a plan: keep employees over 30 with salary > 80k, return name + salary
plan = (
in_memory_table(batch)
.filter((col("age") > lit(30)) & (col("salary") > lit(80_000)))
.select(col("name"), col("salary"))
)
# Execute
ctx = ExecutionContext()
results = execute(plan, ctx)
for rb in results:
print(rb)Reading from Parquet
Plans that scan Parquet files work identically. The file is read on demand during execution.
from marrow.expr import col, lit, parquet_scan, execute, ExecutionContext
plan = (
parquet_scan("employees.parquet")
.filter(col("age") > lit(30))
.select(col("name"), col("age"))
)
ctx = ExecutionContext()
results = execute(plan, ctx)