Use batchcorder with DuckDB

DuckDB scans an Arrow stream source by calling __arrow_c_stream__() once per scan. A query that scans the same source twice — a self-join, an ASOF JOIN, a CTE referenced multiple times — will produce empty results on the second scan because the original RecordBatchReader is already exhausted.

Wrapping the source in a StreamCache eliminates this problem: each call to __arrow_c_stream__() creates a fresh reader starting at batch 0, so DuckDB sees a full stream on every scan.

Prerequisites

  • batchcorder installed (see Getting started)

  • duckdb installed: uv add duckdb

Self-join

import tempfile
import pyarrow as pa
import duckdb
from batchcorder import StreamCache

employees = pa.table({
    "id":         [1,    2,    3,    4   ],
    "name":       ["Alice", "Bob", "Carol", "Dave"],
    "manager_id": [None, 1,    1,    2   ],
})

tmp = tempfile.mkdtemp()
ds = StreamCache(employees, 16 << 20, tmp, 64 << 20)

# Without batchcorder, the second scan of `employees` returns zero rows.
result = duckdb.sql("""
    SELECT e.name AS employee, m.name AS manager
    FROM ds AS e
    LEFT JOIN ds AS m ON e.manager_id = m.id
""").fetchdf()

print(result)
  employee manager
0      Bob   Alice
1    Carol   Alice
2     Dave     Bob
3    Alice     NaN

DuckDB calls __arrow_c_stream__() twice — once for each alias of ds. Because StreamCache returns a fresh StreamCacheReader on every call, both scans see all four rows.

ASOF join (time-series lookup)

ASOF joins are a common pattern in event-stream analytics: for each event, find the most recent reference row whose timestamp is ≤ the event timestamp.

import tempfile
import pyarrow as pa
import duckdb
from batchcorder import StreamCache

events = pa.table({
    "ts":    pa.array([1, 3, 5, 7], type=pa.int64()),
    "value": [10, 20, 30, 40],
})
prices = pa.table({
    "ts":    pa.array([0, 2, 4, 6], type=pa.int64()),
    "price": [100.0, 101.0, 99.0, 102.0],
})

tmp = tempfile.mkdtemp()
events_ds = StreamCache(events, 16 << 20, tmp, 64 << 20)
prices_ds = StreamCache(prices, 16 << 20, tmp, 64 << 20)

result = duckdb.sql("""
    SELECT e.ts, e.value, p.price
    FROM events_ds AS e
    ASOF JOIN prices_ds AS p ON e.ts >= p.ts
""").fetchdf()

print(result)
   ts  value  price
0   3     20  101.0
1   1     10  100.0
2   5     30   99.0
3   7     40  102.0

Each stream participates in only one scan here, but wrapping them ensures future queries that reuse the same Python variable work correctly.

Repeated queries over the same stream

When exploring data interactively, you often run many different queries over the same source without wanting to re-read the file each time:

import tempfile
import pyarrow as pa
import pyarrow.parquet as pq
import duckdb
from batchcorder import StreamCache

# Read a large Parquet file once.
table = pq.read_table("data/large_file.parquet")

tmp = tempfile.mkdtemp()
ds = StreamCache(table, 256 << 20, tmp, 2 << 30)
ds.ingest_all()   # pull everything into cache upfront

# Now run as many queries as you like — each call creates a fresh reader.
q1 = duckdb.sql("SELECT count(*) FROM ds").fetchone()
q2 = duckdb.sql("SELECT avg(value) FROM ds").fetchone()
q3 = duckdb.sql("SELECT * FROM ds WHERE id > 1000 LIMIT 10").fetchdf()