Skip to content

Add nodes to convert between csp structs and arrow record batches#680

Open
arhamchopra wants to merge 3 commits intomainfrom
ac/arrow_nodes
Open

Add nodes to convert between csp structs and arrow record batches#680
arhamchopra wants to merge 3 commits intomainfrom
ac/arrow_nodes

Conversation

@arhamchopra
Copy link
Collaborator

@arhamchopra arhamchopra commented Feb 19, 2026

Add record_batches_to_struct and struct_to_record_batches nodes

Two new C++-backed nodes for bidirectional conversion between csp.Struct and Arrow RecordBatch:

  • struct_to_record_batchests[List[T]]ts[List[pa.RecordBatch]]
  • record_batches_to_structts[List[pa.RecordBatch]]ts[List[T]]

Both use the Arrow C Data Interface (PyCapsule protocol) to cross the Python/C++ boundary without serialization overhead. Scalar and temporal fields are read/written entirely in C++. Numpy array fields use bulk memory transfers rather than per-element iteration.

Supported types

bool, int8/16/32/64, uint8/16/32/64, double, str, bytes, datetime, timedelta, date, time, csp.Enum, nested csp.Struct, Numpy1DArray[T], and NumpyNDArray[T].

Performance

Read and write paths are columnar: each column is processed in a single readAll() / writeAll() call per batch rather than row-by-row virtual dispatch, reducing virtual call count from N×M (rows × columns) to M. The readAll() path uses a null_count==0 fast path that skips per-row validity checks entirely. Fixed-size writers use Unsa feAppend after a single Reserve() to skip per-row capacity and status checks. Nested struct readers pre-allocate all child structs before delegating to child readAll() calls column by column.

Throughput on 100k numeric rows (int64 + float64): ~6.4M rows/sec read, ~800k rows/sec write (write dominated by CSP graph overhead).

Stress tests comparing reading compressed parquet files through both the Parquet adapter (ParquetReader.subscribe_all) and the Arrow path (RecordBatchPullInputAdapterrecord_batches_to_struct):

Scenario Arrow Parquet Speedup
500k rows, 10 timestamps (50k rows/ts) 4.0M rows/s 1.8M rows/s 2.3x
100k rows, 1k timestamps (100 rows/ts) 2.6M rows/s 2.0M rows/s 1.3x
1M rows, 10k timestamps (100 rows/ts) 2.4M rows/s 2.1M rows/s 1.15x
100k rows, 100k timestamps (1 row/ts) 76k rows/s 2.0M rows/s 26x slower

Arrow's advantage grows with larger batches per timestamp. The 1-row-per-timestamp case exposes per-tick overhead from RecordBatchPullInputAdapter creating and converting individual single-row batches — the Parquet adapter handles this natively since its C++ reader emits one struct per row without intermediate Arrow batches.

Example

import csp
import pyarrow as pa
from csp.adapters.arrow import record_batches_to_struct, struct_to_record_batches


class SensorReading(csp.Struct):
    temperature: float
    humidity: float
    station: str


# field_map is always {struct_field_name: arrow_column_name}
field_map = {
    "temperature": "temp_c",
    "humidity": "rel_humidity",
    "station": "station_id",
}

schema = pa.schema([
    ("temp_c", pa.float64()),
    ("rel_humidity", pa.float64()),
    ("station_id", pa.utf8()),
])


@csp.graph
def roundtrip(readings: csp.ts[csp.typing.List[SensorReading]]):
    batches = struct_to_record_batches(readings, SensorReading, field_map)
    restored = record_batches_to_struct(batches, SensorReading, field_map, schema)

# Numpy fields are auto-detected from struct metadata. 1D arrays are stored as list<T>, NDArrays as list<T> plus a list<int64> shape column (default name: <column>_csp_dimensions).

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra marked this pull request as ready for review February 20, 2026 17:07
@arhamchopra arhamchopra added the type: feature Issues and PRs related to new features label Feb 21, 2026
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type: feature Issues and PRs related to new features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant