batchcorder

Batchcorder: Replayable cached Arrow record-batch streams.

Classes

StreamCache

A cached Arrow stream backed by an in-memory Vec or an on-disk IPC file.

StreamCacheReader

A single-use iterator handle for a StreamCache.

CastingStreamCache

A replayable cast view of a StreamCache.

Package Contents

class batchcorder.StreamCache(reader: Any, memory_capacity: int | None = None, disk_path: str | None = None, disk_capacity: int | None = None, write_policy: str = 'on_insertion', max_readers: int | None = None)

A cached Arrow stream backed by an in-memory Vec or an on-disk IPC file.

Wraps any Arrow stream source and stores each RecordBatch so multiple independent StreamCacheReader handles can replay the full stream from any position. The upstream source is ingested lazily on demand and consumed exactly once.

Two storage modes are supported:

  • Memory-only (omit disk_path / disk_capacity): batches are kept as reference-counted pointers in RAM. Reads are zero-copy; no IPC serialisation happens.

  • Disk (provide both disk_path and disk_capacity): batches are serialised to an append-only Arrow IPC file. A configurable hot layer (memory_capacity) keeps recently ingested batches in RAM to reduce disk reads.

Parameters:
reader : object

Any object implementing __arrow_c_stream__ (e.g. pyarrow.Table, pyarrow.RecordBatchReader).

memory_capacity : int, optional

Hot-layer budget in bytes for disk mode. Defaults to total physical RAM. Ignored in memory-only mode.

disk_path : str, optional

Directory for the on-disk IPC file. Created on first use. Must be provided together with disk_capacity.

disk_capacity : int, optional

On-disk storage budget in bytes. Must be provided together with disk_path.

write_policy : str, optional

When batches are flushed to disk (disk mode only; ignored in memory-only mode). "on_insertion" (default) writes every batch to disk immediately. "on_eviction" keeps batches in the hot layer and only writes them to disk when evicted, so a hot layer large enough to hold the whole stream never touches disk.

max_readers : int, optional

Hard cap on the total number of readers ever created from this cache. When set, batches are evicted once all readers have advanced past them, enabling bounded-memory streaming. Eviction only begins once all max_readers readers have actually been created — with fewer live readers every batch is retained so future readers can still replay from the start. Dropping a reader does not free a slot — once max_readers readers have been created, no more can be obtained. reader(from_start=True) raises ValueError if batch 0 has already been evicted. For disk-backed caches, eviction frees memory (hot layer and index) but not bytes in the append-only cache file — disk space is reclaimed only when the cache is closed or dropped. When None (default), all batches are retained indefinitely.

Examples

Memory-only:

>>> import pyarrow as pa
>>> from batchcorder import StreamCache
>>> table = pa.table({"id": [1, 2, 3], "val": [0.5, 1.0, 1.5]})
>>> ds = StreamCache(table)
>>> pa.RecordBatchReader.from_stream(ds).read_all().equals(table)
True

Disk mode:

>>> import tempfile
>>> tmp = tempfile.mkdtemp()
>>> ds = StreamCache(table, memory_capacity=16 << 20, disk_path=tmp, disk_capacity=64 << 20)
>>> pa.RecordBatchReader.from_stream(ds).read_all().equals(table)
True
>>> ds.upstream_exhausted
True

See class docstring for parameter documentation.

property schema : pyarrow.Schema

Arrow schema of this dataset.

Return type:

pyarrow.Schema

Examples

>>> import tempfile, pyarrow as pa
>>> from batchcorder import StreamCache
>>> table = pa.table({"id": [1, 2], "val": [0.5, 1.0]})
>>> tmp = tempfile.mkdtemp()
>>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20)
>>> [f.name for f in ds.schema]
['id', 'val']
property ingested_count : int

Number of batches pulled from the upstream source so far.

Increments lazily as readers consume batches.

Return type:

int

Examples

>>> import tempfile, pyarrow as pa
>>> from batchcorder import StreamCache
>>> table = pa.table({"x": [1, 2, 3]})
>>> tmp = tempfile.mkdtemp()
>>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20)
>>> ds.ingested_count
0
>>> ds.ingest_all()
1
>>> ds.ingested_count
1
property upstream_exhausted : bool

True once the upstream source has been fully consumed.

Return type:

bool

Examples

>>> import tempfile, pyarrow as pa
>>> from batchcorder import StreamCache
>>> table = pa.table({"x": [1, 2, 3]})
>>> tmp = tempfile.mkdtemp()
>>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20)
>>> ds.upstream_exhausted
False
>>> ds.ingest_all()
1
>>> ds.upstream_exhausted
True
reader(from_start: bool = True) StreamCacheReader

Return a new StreamCacheReader handle.

Parameters:
from_start : bool, optional

If True (default), the reader starts at batch 0 and replays the full stream. If False, it starts at the current ingestion frontier and yields only batches ingested after this call.

Return type:

StreamCacheReader

Examples

>>> import tempfile, pyarrow as pa
>>> from batchcorder import StreamCache
>>> table = pa.table({"x": [1, 2, 3]})
>>> tmp = tempfile.mkdtemp()
>>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20)
>>> r1 = ds.reader()
>>> r2 = ds.reader()
>>> r1.closed, r2.closed
(False, False)
__iter__() StreamCacheReader

Iterate over all batches from the start.

Creates a fresh StreamCacheReader starting at batch 0 and returns it as the iterator.

Return type:

StreamCacheReader

__arrow_c_stream__(requested_schema: Any = None) Any

Enable Arrow stream export via the PyCapsule Interface.

This dunder method should not be called directly, but enables zero-copy data transfer to other Python libraries that understand Arrow memory.

Creates a fresh reader starting at batch 0. Allows the dataset to be consumed directly by PyArrow, DuckDB, DataFusion, and any other Arrow-compatible library.

Parameters:
requested_schema : object, optional

Schema capsule to cast the stream to, or None.

Returns:

An Arrow C stream capsule wrapping a fresh reader.

Return type:

PyCapsule

__arrow_c_schema__() Any

Enable Arrow schema export via the PyCapsule Interface.

This dunder method should not be called directly, but enables zero-copy data transfer to other Python libraries that understand Arrow memory.

This allows Arrow consumers to inspect the data type of this StreamCache. Then the consumer can ask the producer (in __arrow_c_stream__) to cast the exported data to a supported data type.

Returns:

An Arrow C schema capsule for the stream’s schema.

Return type:

PyCapsule

cast(target_schema: Any) CastingStreamCache

Cast the dataset to produce batches with the given schema.

Returns a CastingStreamCache — a replayable wrapper that applies the schema cast on every read. Unlike pyarrow.RecordBatchReader.cast(), the result can be consumed multiple times, making it suitable for DuckDB self-joins and ASOF joins.

Parameters:
target_schema : object

Any Arrow schema-compatible object (e.g. pyarrow.Schema, pyarrow.Schema).

Return type:

CastingStreamCache

ingest_all() int

Eagerly ingest all batches from the upstream source into the cache.

After this call upstream_exhausted is True and the upstream reference is released. Subsequent reads are served entirely from cache. Calling this method more than once is safe and idempotent.

Returns:

Total number of batches ingested (including any ingested previously).

Return type:

int

Examples

>>> import tempfile, pyarrow as pa
>>> from batchcorder import StreamCache
>>> table = pa.table({"x": [1, 2, 3]})
>>> tmp = tempfile.mkdtemp()
>>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20)
>>> ds.ingest_all()
1
>>> ds.upstream_exhausted
True
close() None

Close the dataset and destroy the underlying storage.

This method clears the hybrid cache and destroys the disk storage, removing any unused files that were eagerly created.

Return type:

None

Examples

>>> import tempfile, pyarrow as pa
>>> from batchcorder import StreamCache
>>> table = pa.table({"x": [1, 2, 3]})
>>> tmp = tempfile.mkdtemp()
>>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20)
>>> ds.close()
class batchcorder.StreamCacheReader(impl: _batchcorder.StreamCacheReader)

A single-use iterator handle for a StreamCache.

Maintains an independent read position. Multiple handles backed by the same dataset share the underlying cache; the upstream source is ingested lazily as needed.

Once consumed via __arrow_c_stream__ or by exhausting iteration the reader is marked closed and raises an error on further use.

Notes

Obtain a handle from StreamCache.reader() rather than constructing one directly.

Obtain via StreamCache.reader().

property schema : pyarrow.Schema

Arrow schema of batches produced by this reader.

Return type:

pyarrow.Schema

Raises:

ValueError – If the reader has already been consumed.

property closed : bool

True if this reader has been consumed.

Return type:

bool

__arrow_c_stream__(requested_schema: Any = None) Any

Enable Arrow stream export via the PyCapsule Interface.

This dunder method should not be called directly, but enables zero-copy data transfer to other Python libraries that understand Arrow memory.

Consumes the reader; subsequent calls raise an error.

Parameters:
requested_schema : object, optional

Schema capsule to cast the stream to, or None.

Returns:

An Arrow C stream capsule wrapping this reader.

Return type:

PyCapsule

Raises:

ValueError – If the reader has already been consumed.

__arrow_c_schema__() Any

Enable Arrow schema export via the PyCapsule Interface.

This dunder method should not be called directly, but enables zero-copy data transfer to other Python libraries that understand Arrow memory.

This allows Arrow consumers to inspect the data type of this StreamCacheReader. Then the consumer can ask the producer (in __arrow_c_stream__) to cast the exported data to a supported data type.

Returns:

An Arrow C schema capsule for the reader’s schema.

Return type:

PyCapsule

Raises:

ValueError – If the reader has already been consumed.

__iter__() StreamCacheReader

Return self as the iterator.

Return type:

StreamCacheReader

cast(target_schema: Any) pyarrow.RecordBatchReader

Cast the reader to produce batches with the given schema.

Mirrors pyarrow.RecordBatchReader.cast(). Returns a pyarrow.RecordBatchReader that applies the cast as batches are read. Consumes this reader.

Parameters:
target_schema : object

Any Arrow schema-compatible object (e.g. pyarrow.Schema, pyarrow.Schema).

Return type:

pyarrow.RecordBatchReader

Raises:

ValueError – If the reader has already been consumed.

__next__() pyarrow.RecordBatch

Get the next batch from the reader.

Return type:

pyarrow.RecordBatch

class batchcorder.CastingStreamCache(impl: _batchcorder.CastingStreamCache)

A replayable cast view of a StreamCache.

Created by StreamCache.cast(). Each call to __arrow_c_stream__ produces a fresh reader from the underlying cache with each batch cast to schema, so this object is replayable — DuckDB self-joins, ASOF joins, and other multi-scan consumers work correctly on it.

Notes

Obtain via StreamCache.cast() rather than constructing directly.

Obtain via StreamCache.cast().

property schema : pyarrow.Schema

Arrow schema produced by this dataset after casting.

Return type:

pyarrow.Schema

__arrow_c_stream__(requested_schema: Any = None) Any

Enable Arrow stream export via the PyCapsule Interface.

Creates a fresh reader from the underlying cache and applies the cast. Safe to call multiple times — each call produces an independent stream.

Parameters:
requested_schema : object, optional

Schema capsule to further cast the stream to, or None (uses schema).

Returns:

An Arrow C stream capsule wrapping a fresh casting reader.

Return type:

PyCapsule

__arrow_c_schema__() Any

Enable Arrow schema export via the PyCapsule Interface.

Returns the target schema so consumers can inspect the post-cast type.

Returns:

An Arrow C schema capsule for the post-cast schema.

Return type:

PyCapsule

cast(target_schema: Any) CastingStreamCache

Cast to a further target schema, returning a new CastingStreamCache.

Parameters:
target_schema : object

Any Arrow schema-compatible object.

Return type:

CastingStreamCache