Use batchcorder with DuckDB¶
DuckDB scans an Arrow stream source by calling __arrow_c_stream__()
once per scan. A query that scans the same source twice — a self-join,
an ASOF JOIN, a CTE referenced multiple times — will produce empty
results on the second scan because the original RecordBatchReader is
already exhausted.
Wrapping the source in a StreamCache eliminates this problem: each
call to __arrow_c_stream__() creates a fresh reader starting at batch
0, so DuckDB sees a full stream on every scan.
Prerequisites¶
batchcorder installed (see Getting started)
duckdbinstalled:uv add duckdb
Self-join¶
import tempfile
import pyarrow as pa
import duckdb
from batchcorder import StreamCache
employees = pa.table({
"id": [1, 2, 3, 4 ],
"name": ["Alice", "Bob", "Carol", "Dave"],
"manager_id": [None, 1, 1, 2 ],
})
tmp = tempfile.mkdtemp()
ds = StreamCache(employees, 16 << 20, tmp, 64 << 20)
# Without batchcorder, the second scan of `employees` returns zero rows.
result = duckdb.sql("""
SELECT e.name AS employee, m.name AS manager
FROM ds AS e
LEFT JOIN ds AS m ON e.manager_id = m.id
""").fetchdf()
print(result)
employee manager
0 Bob Alice
1 Carol Alice
2 Dave Bob
3 Alice NaN
DuckDB calls __arrow_c_stream__() twice — once for each alias of ds.
Because StreamCache returns a fresh StreamCacheReader on every call,
both scans see all four rows.
ASOF join (time-series lookup)¶
ASOF joins are a common pattern in event-stream analytics: for each event, find the most recent reference row whose timestamp is ≤ the event timestamp.
import tempfile
import pyarrow as pa
import duckdb
from batchcorder import StreamCache
events = pa.table({
"ts": pa.array([1, 3, 5, 7], type=pa.int64()),
"value": [10, 20, 30, 40],
})
prices = pa.table({
"ts": pa.array([0, 2, 4, 6], type=pa.int64()),
"price": [100.0, 101.0, 99.0, 102.0],
})
tmp = tempfile.mkdtemp()
events_ds = StreamCache(events, 16 << 20, tmp, 64 << 20)
prices_ds = StreamCache(prices, 16 << 20, tmp, 64 << 20)
result = duckdb.sql("""
SELECT e.ts, e.value, p.price
FROM events_ds AS e
ASOF JOIN prices_ds AS p ON e.ts >= p.ts
""").fetchdf()
print(result)
ts value price
0 3 20 101.0
1 1 10 100.0
2 5 30 99.0
3 7 40 102.0
Each stream participates in only one scan here, but wrapping them ensures future queries that reuse the same Python variable work correctly.
Repeated queries over the same stream¶
When exploring data interactively, you often run many different queries over the same source without wanting to re-read the file each time:
import tempfile
import pyarrow as pa
import pyarrow.parquet as pq
import duckdb
from batchcorder import StreamCache
# Read a large Parquet file once.
table = pq.read_table("data/large_file.parquet")
tmp = tempfile.mkdtemp()
ds = StreamCache(table, 256 << 20, tmp, 2 << 30)
ds.ingest_all() # pull everything into cache upfront
# Now run as many queries as you like — each call creates a fresh reader.
q1 = duckdb.sql("SELECT count(*) FROM ds").fetchone()
q2 = duckdb.sql("SELECT avg(value) FROM ds").fetchone()
q3 = duckdb.sql("SELECT * FROM ds WHERE id > 1000 LIMIT 10").fetchdf()