Configure cache storage¶
StreamCache supports two storage modes. Choose based on whether your
dataset fits in RAM.
Memory-only mode (default)¶
When disk_path and disk_capacity are omitted, every batch is stored
as a reference-counted pointer (Arc<RecordBatch>) in RAM. Reads are
zero-copy and no serialisation happens.
from batchcorder import StreamCache
ds = StreamCache(source) # all batches kept in RAM
Batches accumulate until the stream is fully ingested. memory_capacity
has no effect in this mode — use disk mode if your dataset exceeds
available RAM.
Disk mode¶
Provide both disk_path and disk_capacity to enable on-disk storage.
Every batch is serialised to an append-only Arrow IPC file at ingestion
time. A hot layer keeps recently ingested batches in RAM to avoid
redundant disk reads.
import tempfile
from batchcorder import StreamCache
tmp = tempfile.mkdtemp()
ds = StreamCache(
source,
disk_path=tmp,
disk_capacity=2 << 30, # 2 GB disk budget
)
Concurrent readers use positional I/O (pread-style) to read different
offsets simultaneously without blocking each other.
Tuning the hot layer¶
In disk mode, memory_capacity controls how many bytes of recently
ingested batches are kept in RAM. Defaults to total physical RAM.
ds = StreamCache(
source,
memory_capacity=256 << 20, # 256 MB hot layer
disk_path=tmp,
disk_capacity=2 << 30,
)
Set memory_capacity to cover the data your hottest readers are
actively consuming. Batches that exceed the hot budget are served
directly from disk with no in-memory copy.
Bounded-memory streaming with max_readers¶
When you know how many times the stream will be read, set max_readers
to evict batches once all readers have advanced past them. This keeps
memory usage proportional to the window between the slowest and fastest
reader, rather than the full stream.
ds = StreamCache(
source,
max_readers=2, # at most 2 reads of the stream
)
r1 = ds.reader()
r2 = ds.reader()
# As both readers advance, batches behind the slowest are freed.
result1 = pa.RecordBatchReader.from_stream(r1).read_all()
result2 = pa.RecordBatchReader.from_stream(r2).read_all()
Important: max_readers is a hard cap on the total number of
readers ever created — dropping a reader does not free a slot for a new
one. Once all max_readers readers have been created, no further
readers can be obtained. Eviction does not begin until all max_readers
readers have actually been created: with fewer readers, every batch is
retained so that future readers can still replay from the start.
Additionally, once batches have been evicted, reader(from_start=True)
raises ValueError because batch 0 is no longer available.
For disk-backed caches, eviction frees memory (hot layer and index entries) but not bytes already written to the append-only cache file — disk space is reclaimed only when the cache is closed or dropped.
When max_readers is omitted (default), all batches are retained
indefinitely and unlimited readers are allowed.
Training-loop workloads¶
For ML training loops where you replay the full stream each epoch, call
ingest_all() once at startup:
import os, tempfile
import pyarrow.parquet as pq
from batchcorder import StreamCache
tmp = tempfile.mkdtemp()
ds = StreamCache(
pq.read_table("shard_0.parquet"),
disk_path=tmp,
disk_capacity=4 << 30, # 4 GB
)
ds.ingest_all() # pull everything into cache before the training loop
All subsequent reads come from cache — no upstream I/O on replay.