Concurrent reads¶
Multiple StreamCacheReader handles backed by the same StreamCache
are fully independent and thread-safe. This guide shows common patterns
for reading a cached stream from several threads or tasks
simultaneously.
Creating independent reader handles¶
Each call to StreamCache.reader() returns a fresh handle starting at
batch 0 (or the current ingestion frontier when from_start=False).
Handles share the underlying cache but maintain separate read positions.
import pyarrow as pa
from batchcorder import StreamCache
ds = StreamCache(source)
r1 = ds.reader() # starts at batch 0
r2 = ds.reader() # independent handle, also at batch 0
# r1 and r2 can advance at completely different rates.
result1 = pa.RecordBatchReader.from_stream(r1).read_all()
result2 = pa.RecordBatchReader.from_stream(r2).read_all()
Reading from multiple threads¶
StreamCache and StreamCacheReader are thread-safe. A common pattern
is to hand each worker thread its own reader handle before starting the
threads:
import threading
import pyarrow as pa
from batchcorder import StreamCache
ds = StreamCache(source)
results = [None] * 4
def read(i):
results[i] = pa.RecordBatchReader.from_stream(ds.reader()).read_all()
threads = [threading.Thread(target=read, args=(i,)) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
All threads share a single ingestion pass — the upstream source is read at most once regardless of how many readers are active.
Pre-ingesting before concurrent access¶
If you want to guarantee that all data is in the cache before any reader
starts (useful when the upstream source may not be accessible later),
call ingest_all() first:
ds = StreamCache(source)
ds.ingest_all() # upstream fully consumed; cache is complete
# Safe to hand out readers to any number of threads now.
readers = [ds.reader() for _ in range(8)]
Bounded concurrent reads with max_readers¶
If you know the number of concurrent readers upfront, set max_readers
to enable automatic eviction. Batches are freed from cache once all
readers have advanced past them:
ds = StreamCache(source, max_readers=4)
results = [None] * 4
# Create readers in the main thread to guarantee all slots are claimed.
readers = [ds.reader() for _ in range(4)]
def read(i):
results[i] = pa.RecordBatchReader.from_stream(readers[i]).read_all()
threads = [threading.Thread(target=read, args=(i,)) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
This keeps memory proportional to the gap between the fastest and
slowest reader. Without max_readers, all batches are retained for the
lifetime of the cache.
Important: max_readers is a hard cap on the total number of
readers ever created, not the number of concurrent readers. Dropping a
reader does not free a slot for a new one. Once all max_readers
readers have been created, no further readers can be obtained. Eviction
does not begin until all max_readers readers have actually been
created — with fewer readers, every batch is retained so future readers
can still replay from the start.
Also note that once eviction has begun, reader(from_start=True) raises
ValueError because batch 0 is no longer available. Anything that
creates a reader counts against the cap: ds.reader(), iter(ds),
ds.__arrow_c_stream__() (so each engine scan of ds), and each read
of a cast() result all consume one slot.
For disk-backed caches, eviction frees memory (hot layer and index entries) but not bytes already written to the append-only cache file — disk space is reclaimed only when the cache is closed or dropped.
Reading from __arrow_c_stream__¶
StreamCache itself implements __arrow_c_stream__, which creates a
fresh reader on every call. This lets DuckDB, DataFusion, and other
Arrow consumers perform multiple scans without any extra wrapping:
import duckdb
# DuckDB can scan `ds` as many times as needed.
duckdb.table("ds").show()