Concurrent reads

Multiple StreamCacheReader handles backed by the same StreamCache are fully independent and thread-safe. This guide shows common patterns for reading a cached stream from several threads or tasks simultaneously.

Creating independent reader handles

Each call to StreamCache.reader() returns a fresh handle starting at batch 0 (or the current ingestion frontier when from_start=False). Handles share the underlying cache but maintain separate read positions.

import pyarrow as pa
from batchcorder import StreamCache

ds = StreamCache(source)

r1 = ds.reader()   # starts at batch 0
r2 = ds.reader()   # independent handle, also at batch 0

# r1 and r2 can advance at completely different rates.
result1 = pa.RecordBatchReader.from_stream(r1).read_all()
result2 = pa.RecordBatchReader.from_stream(r2).read_all()

Reading from multiple threads

StreamCache and StreamCacheReader are thread-safe. A common pattern is to hand each worker thread its own reader handle before starting the threads:

import threading
import pyarrow as pa
from batchcorder import StreamCache

ds = StreamCache(source)
results = [None] * 4

def read(i):
    results[i] = pa.RecordBatchReader.from_stream(ds.reader()).read_all()

threads = [threading.Thread(target=read, args=(i,)) for i in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

All threads share a single ingestion pass — the upstream source is read at most once regardless of how many readers are active.

Pre-ingesting before concurrent access

If you want to guarantee that all data is in the cache before any reader starts (useful when the upstream source may not be accessible later), call ingest_all() first:

ds = StreamCache(source)
ds.ingest_all()   # upstream fully consumed; cache is complete

# Safe to hand out readers to any number of threads now.
readers = [ds.reader() for _ in range(8)]

Bounded concurrent reads with max_readers

If you know the number of concurrent readers upfront, set max_readers to enable automatic eviction. Batches are freed from cache once all readers have advanced past them:

ds = StreamCache(source, max_readers=4)
results = [None] * 4

# Create readers in the main thread to guarantee all slots are claimed.
readers = [ds.reader() for _ in range(4)]

def read(i):
    results[i] = pa.RecordBatchReader.from_stream(readers[i]).read_all()

threads = [threading.Thread(target=read, args=(i,)) for i in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

This keeps memory proportional to the gap between the fastest and slowest reader. Without max_readers, all batches are retained for the lifetime of the cache.

Important: max_readers is a hard cap on the total number of readers ever created, not the number of concurrent readers. Dropping a reader does not free a slot for a new one. Once all max_readers readers have been created, no further readers can be obtained. Eviction does not begin until all max_readers readers have actually been created — with fewer readers, every batch is retained so future readers can still replay from the start.

Also note that once eviction has begun, reader(from_start=True) raises ValueError because batch 0 is no longer available. Anything that creates a reader counts against the cap: ds.reader(), iter(ds), ds.__arrow_c_stream__() (so each engine scan of ds), and each read of a cast() result all consume one slot.

For disk-backed caches, eviction frees memory (hot layer and index entries) but not bytes already written to the append-only cache file — disk space is reclaimed only when the cache is closed or dropped.

Reading from __arrow_c_stream__

StreamCache itself implements __arrow_c_stream__, which creates a fresh reader on every call. This lets DuckDB, DataFusion, and other Arrow consumers perform multiple scans without any extra wrapping:

import duckdb

# DuckDB can scan `ds` as many times as needed.
duckdb.table("ds").show()