Configure cache storage

StreamCache supports two storage modes. Choose based on whether your dataset fits in RAM.

Memory-only mode (default)

When disk_path and disk_capacity are omitted, every batch is stored as a reference-counted pointer (Arc<RecordBatch>) in RAM. Reads are zero-copy and no serialisation happens.

from batchcorder import StreamCache

ds = StreamCache(source)   # all batches kept in RAM

Batches accumulate until the stream is fully ingested. memory_capacity has no effect in this mode — use disk mode if your dataset exceeds available RAM.

Disk mode

Provide both disk_path and disk_capacity to enable on-disk storage. Every batch is serialised to an append-only Arrow IPC file at ingestion time. A hot layer keeps recently ingested batches in RAM to avoid redundant disk reads.

import tempfile
from batchcorder import StreamCache

tmp = tempfile.mkdtemp()
ds = StreamCache(
    source,
    disk_path=tmp,
    disk_capacity=2 << 30,   # 2 GB disk budget
)

Concurrent readers use positional I/O (pread-style) to read different offsets simultaneously without blocking each other.

Tuning the hot layer

In disk mode, memory_capacity controls how many bytes of recently ingested batches are kept in RAM. Defaults to total physical RAM.

ds = StreamCache(
    source,
    memory_capacity=256 << 20,  # 256 MB hot layer
    disk_path=tmp,
    disk_capacity=2 << 30,
)

Set memory_capacity to cover the data your hottest readers are actively consuming. Batches that exceed the hot budget are served directly from disk with no in-memory copy.

Bounded-memory streaming with max_readers

When you know how many times the stream will be read, set max_readers to evict batches once all readers have advanced past them. This keeps memory usage proportional to the window between the slowest and fastest reader, rather than the full stream.

ds = StreamCache(
    source,
    max_readers=2,   # at most 2 reads of the stream
)

r1 = ds.reader()
r2 = ds.reader()

# As both readers advance, batches behind the slowest are freed.
result1 = pa.RecordBatchReader.from_stream(r1).read_all()
result2 = pa.RecordBatchReader.from_stream(r2).read_all()

Important: max_readers is a hard cap on the total number of readers ever created — dropping a reader does not free a slot for a new one. Once all max_readers readers have been created, no further readers can be obtained. Eviction does not begin until all max_readers readers have actually been created: with fewer readers, every batch is retained so that future readers can still replay from the start. Additionally, once batches have been evicted, reader(from_start=True) raises ValueError because batch 0 is no longer available.

For disk-backed caches, eviction frees memory (hot layer and index entries) but not bytes already written to the append-only cache file — disk space is reclaimed only when the cache is closed or dropped.

When max_readers is omitted (default), all batches are retained indefinitely and unlimited readers are allowed.

Training-loop workloads

For ML training loops where you replay the full stream each epoch, call ingest_all() once at startup:

import os, tempfile
import pyarrow.parquet as pq
from batchcorder import StreamCache

tmp = tempfile.mkdtemp()
ds = StreamCache(
    pq.read_table("shard_0.parquet"),
    disk_path=tmp,
    disk_capacity=4 << 30,  # 4 GB
)
ds.ingest_all()   # pull everything into cache before the training loop

All subsequent reads come from cache — no upstream I/O on replay.