Getting started with batchcorder¶
This tutorial guides you through installing batchcorder and caching your first Arrow stream. By the end you will understand the core loop: wrap a stream, create readers, and replay batches from the cache.
What you will build¶
A script that wraps a PyArrow table in a StreamCache, reads it twice
through two independent reader handles, and confirms both produce
identical output — without ever touching the original source a second
time.
Prerequisites¶
Python 3.11 or later
Step 1 — Install¶
Install batchcorder from PyPI:
pip install batchcorder
Verify the installation:
import batchcorder
print(batchcorder.__version__)
0.1.3
Step 2 — Wrap a PyArrow table in a StreamCache¶
Any object that implements __arrow_c_stream__ can be wrapped — a
pyarrow.Table, a pyarrow.RecordBatchReader, etc.
import tempfile
import pyarrow as pa
from batchcorder import StreamCache
# Source data — in practice this would be a file, network stream, etc.
table = pa.table({"id": [1, 2, 3, 4, 5], "value": [0.1, 0.2, 0.3, 0.4, 0.5]})
# Choose cache sizes appropriate for your data volume.
tmp = tempfile.mkdtemp()
ds = StreamCache(
table, # any __arrow_c_stream__ source
memory_capacity=16 << 20, # 16 MB in-memory tier
disk_path=tmp, # directory for the on-disk tier
disk_capacity=64 << 20, # 64 MB on-disk tier
)
print(ds.schema)
# schema: id: int64, value: double
print(ds.upstream_exhausted)
# False — nothing ingested yet
id: int64
value: double
False
Step 3 — Read through the stream¶
Iterating directly over ds creates a fresh reader starting at batch 0:
for batch in ds:
print(batch.num_rows, "rows")
# 5 rows (one batch for the whole table by default)
print(ds.upstream_exhausted)
# True — the upstream source has been consumed
5 rows
True
Step 4 — Replay with a second reader¶
Because the data is now in the cache, you can create more readers and replay from the start as many times as needed:
r1 = ds.reader() # starts at batch 0
r2 = ds.reader() # independent handle, also at batch 0
result1 = pa.RecordBatchReader.from_stream(r1).read_all()
result2 = pa.RecordBatchReader.from_stream(r2).read_all()
assert result1.equals(result2)
print("Both readers produced identical output.")
Both readers produced identical output.
Notice that r1 and r2 advance their positions independently. r1
finishing does not affect r2, and neither triggers another read from
the original table object.
Step 5 — Pre-ingest upfront¶
If you know you will replay the stream many times, call ingest_all()
once at startup to pull everything into the cache before your main loop
runs:
ds2 = StreamCache(table, 16 << 20, tmp, 64 << 20)
total_batches = ds2.ingest_all()
print(f"Ingested {total_batches} batch(es).")
# All subsequent reads come from cache — no upstream I/O.
for _ in range(10):
batches = list(ds2)
print(len(batches), "batches replayed")
Ingested 1 batch(es).
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
What you have learned¶
StreamCachewraps any Arrow stream and stores batches in a memory or disk cache.The upstream source is consumed at most once; all replays come from the cache.
StreamCacheReaderhandles are independent — creating or exhausting one does not affect others.ingest_all()lets you front-load all I/O before a replay-heavy loop.
Next steps¶
Use batchcorder with DuckDB for multi-scan queries.
Configure cache storage to choose between memory and disk modes.
Concurrent reads for multi-threaded patterns.