Getting started with batchcorder

This tutorial guides you through installing batchcorder and caching your first Arrow stream. By the end you will understand the core loop: wrap a stream, create readers, and replay batches from the cache.

What you will build

A script that wraps a PyArrow table in a StreamCache, reads it twice through two independent reader handles, and confirms both produce identical output — without ever touching the original source a second time.

Prerequisites

  • Python 3.11 or later

Step 1 — Install

Install batchcorder from PyPI:

pip install batchcorder

Verify the installation:

import batchcorder
print(batchcorder.__version__)
0.1.3

Step 2 — Wrap a PyArrow table in a StreamCache

Any object that implements __arrow_c_stream__ can be wrapped — a pyarrow.Table, a pyarrow.RecordBatchReader, etc.

import tempfile
import pyarrow as pa
from batchcorder import StreamCache

# Source data — in practice this would be a file, network stream, etc.
table = pa.table({"id": [1, 2, 3, 4, 5], "value": [0.1, 0.2, 0.3, 0.4, 0.5]})

# Choose cache sizes appropriate for your data volume.
tmp = tempfile.mkdtemp()
ds = StreamCache(
    table,                        # any __arrow_c_stream__ source
    memory_capacity=16 << 20,     # 16 MB in-memory tier
    disk_path=tmp,                # directory for the on-disk tier
    disk_capacity=64 << 20,       # 64 MB on-disk tier
)

print(ds.schema)
# schema: id: int64, value: double

print(ds.upstream_exhausted)
# False — nothing ingested yet
id: int64
value: double
False

Step 3 — Read through the stream

Iterating directly over ds creates a fresh reader starting at batch 0:

for batch in ds:
    print(batch.num_rows, "rows")
# 5 rows  (one batch for the whole table by default)

print(ds.upstream_exhausted)
# True — the upstream source has been consumed
5 rows
True

Step 4 — Replay with a second reader

Because the data is now in the cache, you can create more readers and replay from the start as many times as needed:

r1 = ds.reader()   # starts at batch 0
r2 = ds.reader()   # independent handle, also at batch 0

result1 = pa.RecordBatchReader.from_stream(r1).read_all()
result2 = pa.RecordBatchReader.from_stream(r2).read_all()

assert result1.equals(result2)
print("Both readers produced identical output.")
Both readers produced identical output.

Notice that r1 and r2 advance their positions independently. r1 finishing does not affect r2, and neither triggers another read from the original table object.

Step 5 — Pre-ingest upfront

If you know you will replay the stream many times, call ingest_all() once at startup to pull everything into the cache before your main loop runs:

ds2 = StreamCache(table, 16 << 20, tmp, 64 << 20)
total_batches = ds2.ingest_all()
print(f"Ingested {total_batches} batch(es).")

# All subsequent reads come from cache — no upstream I/O.
for _ in range(10):
    batches = list(ds2)
    print(len(batches), "batches replayed")
Ingested 1 batch(es).
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed
1 batches replayed

What you have learned

  • StreamCache wraps any Arrow stream and stores batches in a memory or disk cache.

  • The upstream source is consumed at most once; all replays come from the cache.

  • StreamCacheReader handles are independent — creating or exhausting one does not affect others.

  • ingest_all() lets you front-load all I/O before a replay-heavy loop.

Next steps