batchcorder¶
Batchcorder: Replayable cached Arrow record-batch streams.
Classes¶
A cached Arrow stream backed by an in-memory Vec or an on-disk IPC file. |
|
A single-use iterator handle for a |
|
A replayable cast view of a |
Package Contents¶
-
class batchcorder.StreamCache(reader: Any, memory_capacity: int | None =
None, disk_path: str | None =None, disk_capacity: int | None =None, write_policy: str ='on_insertion', max_readers: int | None =None)¶ A cached Arrow stream backed by an in-memory Vec or an on-disk IPC file.
Wraps any Arrow stream source and stores each
RecordBatchso multiple independentStreamCacheReaderhandles can replay the full stream from any position. The upstream source is ingested lazily on demand and consumed exactly once.Two storage modes are supported:
Memory-only (omit
disk_path/disk_capacity): batches are kept as reference-counted pointers in RAM. Reads are zero-copy; no IPC serialisation happens.Disk (provide both
disk_pathanddisk_capacity): batches are serialised to an append-only Arrow IPC file. A configurable hot layer (memory_capacity) keeps recently ingested batches in RAM to reduce disk reads.
- Parameters:¶
- reader : object¶
Any object implementing
__arrow_c_stream__(e.g.pyarrow.Table,pyarrow.RecordBatchReader).- memory_capacity : int, optional¶
Hot-layer budget in bytes for disk mode. Defaults to total physical RAM. Ignored in memory-only mode.
- disk_path : str, optional¶
Directory for the on-disk IPC file. Created on first use. Must be provided together with
disk_capacity.- disk_capacity : int, optional¶
On-disk storage budget in bytes. Must be provided together with
disk_path.- write_policy : str, optional¶
When batches are flushed to disk (disk mode only; ignored in memory-only mode).
"on_insertion"(default) writes every batch to disk immediately."on_eviction"keeps batches in the hot layer and only writes them to disk when evicted, so a hot layer large enough to hold the whole stream never touches disk.- max_readers : int, optional¶
Hard cap on the total number of readers ever created from this cache. When set, batches are evicted once all readers have advanced past them, enabling bounded-memory streaming. Eviction only begins once all
max_readersreaders have actually been created — with fewer live readers every batch is retained so future readers can still replay from the start. Dropping a reader does not free a slot — oncemax_readersreaders have been created, no more can be obtained.reader(from_start=True)raisesValueErrorif batch 0 has already been evicted. For disk-backed caches, eviction frees memory (hot layer and index) but not bytes in the append-only cache file — disk space is reclaimed only when the cache is closed or dropped. WhenNone(default), all batches are retained indefinitely.
Examples
Memory-only:
>>> import pyarrow as pa >>> from batchcorder import StreamCache >>> table = pa.table({"id": [1, 2, 3], "val": [0.5, 1.0, 1.5]}) >>> ds = StreamCache(table) >>> pa.RecordBatchReader.from_stream(ds).read_all().equals(table) TrueDisk mode:
>>> import tempfile >>> tmp = tempfile.mkdtemp() >>> ds = StreamCache(table, memory_capacity=16 << 20, disk_path=tmp, disk_capacity=64 << 20) >>> pa.RecordBatchReader.from_stream(ds).read_all().equals(table) True >>> ds.upstream_exhausted TrueSee class docstring for parameter documentation.
- property schema : pyarrow.Schema¶
Arrow schema of this dataset.
- Return type:¶
Examples
>>> import tempfile, pyarrow as pa >>> from batchcorder import StreamCache >>> table = pa.table({"id": [1, 2], "val": [0.5, 1.0]}) >>> tmp = tempfile.mkdtemp() >>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20) >>> [f.name for f in ds.schema] ['id', 'val']
- property ingested_count : int¶
Number of batches pulled from the upstream source so far.
Increments lazily as readers consume batches.
Examples
>>> import tempfile, pyarrow as pa >>> from batchcorder import StreamCache >>> table = pa.table({"x": [1, 2, 3]}) >>> tmp = tempfile.mkdtemp() >>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20) >>> ds.ingested_count 0 >>> ds.ingest_all() 1 >>> ds.ingested_count 1
- property upstream_exhausted : bool¶
Trueonce the upstream source has been fully consumed.Examples
>>> import tempfile, pyarrow as pa >>> from batchcorder import StreamCache >>> table = pa.table({"x": [1, 2, 3]}) >>> tmp = tempfile.mkdtemp() >>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20) >>> ds.upstream_exhausted False >>> ds.ingest_all() 1 >>> ds.upstream_exhausted True
-
reader(from_start: bool =
True) StreamCacheReader¶ Return a new
StreamCacheReaderhandle.Examples
>>> import tempfile, pyarrow as pa >>> from batchcorder import StreamCache >>> table = pa.table({"x": [1, 2, 3]}) >>> tmp = tempfile.mkdtemp() >>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20) >>> r1 = ds.reader() >>> r2 = ds.reader() >>> r1.closed, r2.closed (False, False)
- __iter__() StreamCacheReader¶
Iterate over all batches from the start.
Creates a fresh
StreamCacheReaderstarting at batch 0 and returns it as the iterator.- Return type:¶
-
__arrow_c_stream__(requested_schema: Any =
None) Any¶ Enable Arrow stream export via the PyCapsule Interface.
This dunder method should not be called directly, but enables zero-copy data transfer to other Python libraries that understand Arrow memory.
Creates a fresh reader starting at batch 0. Allows the dataset to be consumed directly by PyArrow, DuckDB, DataFusion, and any other Arrow-compatible library.
- __arrow_c_schema__() Any¶
Enable Arrow schema export via the PyCapsule Interface.
This dunder method should not be called directly, but enables zero-copy data transfer to other Python libraries that understand Arrow memory.
This allows Arrow consumers to inspect the data type of this
StreamCache. Then the consumer can ask the producer (in__arrow_c_stream__) to cast the exported data to a supported data type.
- cast(target_schema: Any) CastingStreamCache¶
Cast the dataset to produce batches with the given schema.
Returns a
CastingStreamCache— a replayable wrapper that applies the schema cast on every read. Unlikepyarrow.RecordBatchReader.cast(), the result can be consumed multiple times, making it suitable for DuckDB self-joins and ASOF joins.- Parameters:¶
- target_schema : object¶
Any Arrow schema-compatible object (e.g.
pyarrow.Schema,pyarrow.Schema).
- Return type:¶
- ingest_all() int¶
Eagerly ingest all batches from the upstream source into the cache.
After this call
upstream_exhaustedisTrueand the upstream reference is released. Subsequent reads are served entirely from cache. Calling this method more than once is safe and idempotent.Examples
>>> import tempfile, pyarrow as pa >>> from batchcorder import StreamCache >>> table = pa.table({"x": [1, 2, 3]}) >>> tmp = tempfile.mkdtemp() >>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20) >>> ds.ingest_all() 1 >>> ds.upstream_exhausted True
- close() None¶
Close the dataset and destroy the underlying storage.
This method clears the hybrid cache and destroys the disk storage, removing any unused files that were eagerly created.
- Return type:¶
None
Examples
>>> import tempfile, pyarrow as pa >>> from batchcorder import StreamCache >>> table = pa.table({"x": [1, 2, 3]}) >>> tmp = tempfile.mkdtemp() >>> ds = StreamCache(table, 16 << 20, tmp, 64 << 20) >>> ds.close()
- class batchcorder.StreamCacheReader(impl: _batchcorder.StreamCacheReader)¶
A single-use iterator handle for a
StreamCache.Maintains an independent read position. Multiple handles backed by the same dataset share the underlying cache; the upstream source is ingested lazily as needed.
Once consumed via
__arrow_c_stream__or by exhausting iteration the reader is marked closed and raises an error on further use.Notes
Obtain a handle from
StreamCache.reader()rather than constructing one directly.Obtain via
StreamCache.reader().- property schema : pyarrow.Schema¶
Arrow schema of batches produced by this reader.
- Return type:¶
- Raises:¶
ValueError – If the reader has already been consumed.
-
__arrow_c_stream__(requested_schema: Any =
None) Any¶ Enable Arrow stream export via the PyCapsule Interface.
This dunder method should not be called directly, but enables zero-copy data transfer to other Python libraries that understand Arrow memory.
Consumes the reader; subsequent calls raise an error.
- __arrow_c_schema__() Any¶
Enable Arrow schema export via the PyCapsule Interface.
This dunder method should not be called directly, but enables zero-copy data transfer to other Python libraries that understand Arrow memory.
This allows Arrow consumers to inspect the data type of this
StreamCacheReader. Then the consumer can ask the producer (in__arrow_c_stream__) to cast the exported data to a supported data type.- Returns:¶
An Arrow C schema capsule for the reader’s schema.
- Return type:¶
PyCapsule
- Raises:¶
ValueError – If the reader has already been consumed.
- __iter__() StreamCacheReader¶
Return self as the iterator.
- Return type:¶
- cast(target_schema: Any) pyarrow.RecordBatchReader¶
Cast the reader to produce batches with the given schema.
Mirrors
pyarrow.RecordBatchReader.cast(). Returns apyarrow.RecordBatchReaderthat applies the cast as batches are read. Consumes this reader.- Parameters:¶
- target_schema : object¶
Any Arrow schema-compatible object (e.g.
pyarrow.Schema,pyarrow.Schema).
- Return type:¶
- Raises:¶
ValueError – If the reader has already been consumed.
- __next__() pyarrow.RecordBatch¶
Get the next batch from the reader.
- Return type:¶
- class batchcorder.CastingStreamCache(impl: _batchcorder.CastingStreamCache)¶
A replayable cast view of a
StreamCache.Created by
StreamCache.cast(). Each call to__arrow_c_stream__produces a fresh reader from the underlying cache with each batch cast toschema, so this object is replayable — DuckDB self-joins, ASOF joins, and other multi-scan consumers work correctly on it.Notes
Obtain via
StreamCache.cast()rather than constructing directly.Obtain via
StreamCache.cast().- property schema : pyarrow.Schema¶
Arrow schema produced by this dataset after casting.
- Return type:¶
-
__arrow_c_stream__(requested_schema: Any =
None) Any¶ Enable Arrow stream export via the PyCapsule Interface.
Creates a fresh reader from the underlying cache and applies the cast. Safe to call multiple times — each call produces an independent stream.
- __arrow_c_schema__() Any¶
Enable Arrow schema export via the PyCapsule Interface.
Returns the target schema so consumers can inspect the post-cast type.
- cast(target_schema: Any) CastingStreamCache¶
Cast to a further target schema, returning a new
CastingStreamCache.