Skip to content

Rewrite the parquet input adapter manager#704

Open
arhamchopra wants to merge 15 commits into
mainfrom
ac/parquet_input_adapter
Open

Rewrite the parquet input adapter manager#704
arhamchopra wants to merge 15 commits into
mainfrom
ac/parquet_input_adapter

Conversation

@arhamchopra
Copy link
Copy Markdown
Collaborator

@arhamchopra arhamchopra commented Apr 23, 2026

Rewrite the parquet input adapter for RecordBatch-based streaming

Replaces the old ParquetReader / ParquetReaderColumnAdapter class hierarchy with a new three-layer architecture that operates on Arrow RecordBatch data directly. The new design is simpler (fewer virtual calls, no per-type reader subclasses) and supports reading from parquet files, Arrow IPC streams, and in-memory Arrow Tables through a unified RecordBatchStreamSource interface.

Motivation

The old implementation had:

  • A deep class hierarchy (FileReaderWrapperParquetFileReaderWrapper, per-type ReaderColumnAdapter subclasses) that was hard to extend
  • No support for the Arrow C Stream Interface (couldn't accept RecordBatchReader from external sources)
  • Row-by-row Parquet reads without RecordBatch-level column projection
  • ~2500 lines of boilerplate reader classes that duplicated Arrow's own type dispatch

The new implementation:

  • Reads entire RecordBatches and iterates rows within them (cache-friendly columnar access)
  • Uses Arrow's native column projection to read only requested columns from parquet
  • Exposes a RecordBatchStreamSource interface that cleanly separates file management from row processing
  • Reduces total C++ reader code by ~40%

Architecture

RecordBatchStreamSource          — file/stream boundary management
  └→ RecordBatchRowProcessor     — row-level cursor across N sources, alignment validation
       └→ ColumnDispatcher       — per-column type-erased read + dispatch to csp adapters
            └→ FieldReader       — Arrow array → typed value extraction (incl. nested structs)

RecordBatchStreamSource (new interface) abstracts file iteration with two implementations:

  • NativeParquetStreamSource — C++ opens parquet files directly with leaf-level column projection
  • PyRecordBatchStreamSource — Python yields RecordBatchReader objects via Arrow C Stream Interface (IPC, memory tables)

RecordBatchRowProcessor (new) binds to N RecordBatchReader* (one per split-column file), provides readRowAndAdvance() / skipRow() / dispatchRow(). Validates split-column alignment at runtime.

ColumnDispatcher (new) is a type-erased wrapper combining FieldReader + value storage + ValueDispatcher, one per subscribed column.

ParquetInputAdapterManager is rewritten to orchestrate via the above layers. It no longer touches Arrow arrays directly.

What's removed

  • ParquetReader / ParquetReaderColumnAdapter (~2500 lines) — the old per-type reader class hierarchy
  • FileReaderWrapper / ParquetFileReaderWrapper / ArrowIPCFileReaderWrapper — old file abstractions
  • DialectGenericListReaderInterface — unused reader interface
  • Dead m_rbSources member in DictBasketReaderRecord (declared/cleared but never populated)

Bug fixes

  • Nested struct column projection: Parquet stores struct sub-fields as separate leaf columns. The old projection logic used Arrow field indices directly, causing only the first sub-field to be read for struct columns. Added countLeafColumns() to correctly expand struct fields into all their leaf indices.
  • Null deref on schema change: if the time column disappears between files with allow_missing_columns=True, the adapter now throws a clear RuntimeError instead of segfaulting.
  • Stale basket processor sources: when basket columns are absent from a new stream, basket processor sources are now explicitly cleared, preventing use-after-free of dangling reader pointers.
  • Split-column row count alignment: runtime validation that all split-column files have matching row counts per batch.

Performance

Benchmarked on Linux (Python 3.13, Arrow 23.0.1, GCC 14.3) across 73 scenarios covering scale, types, read patterns, row groups, sources, and access patterns. Each scenario runs 5 iterations with 2 warmup runs; results validated with coefficient-of-variation trust indicators.

Category Speedup Notes
Large scale (10M–1B rows) 1.17–1.34x Prefetch overlap hides IO latency at scale
Medium scale (1M–5M rows) 1.13–1.17x Batch-oriented decode amortizes setup cost
Structs 1.13–1.45x Bulk sub-field read eliminates per-field virtual dispatch
Partial reads & filters 1.08–1.63x Column projection + early row skipping
Symbol filtering 1.17–1.47x Faster row skip logic in new architecture
Row group sizes 1.14–1.22x Smaller row groups benefit more from prefetch overlap
Multi-file / split-column 1.05–1.22x Better source orchestration across files
Small workloads (<500K) 1.06–1.14x Modest gains; CSP graph dispatch dominates

API compatibility

The public Python API (ParquetReader.subscribe, subscribe_all, subscribe_dict_basket) is unchanged. All 128 existing + new tests pass (covering all Arrow types, null handling, struct projection, split columns, dict baskets, multi-file, IPC, and in-memory tables).

@arhamchopra arhamchopra force-pushed the ac/parquet_input_adapter branch from a40d7a1 to bc4b134 Compare April 23, 2026 17:50
@timkpaine timkpaine added type: feature Issues and PRs related to new features adapter: parquet Issues and PRs related to our Apache Parquet/Arrow adapter labels Apr 23, 2026
@arhamchopra arhamchopra force-pushed the ac/parquet_input_adapter branch from bc4b134 to 5122477 Compare April 27, 2026 21:37
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
…iguity

The introduction of namespace csp::adapters::arrow (for the new
ColumnDispatcher/RecordBatchRowProcessor classes) creates ambiguity when
writer-side headers use unqualified arrow:: inside namespace
csp::adapters::parquet. The compiler finds the sibling csp::adapters::arrow
namespace before the global ::arrow namespace.

Also forward-declares ColumnDispatcher and RecordBatchRowProcessor in
ParquetInputAdapterManager.h (moving full includes to .cpp) and adds
direct includes for csp/core/Exception.h and arrow/table.h that were
previously provided transitively through the now-deleted reader headers.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra force-pushed the ac/parquet_input_adapter branch from 86d9f43 to 2ac4688 Compare April 28, 2026 15:21
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra marked this pull request as ready for review April 30, 2026 01:16
Comment thread cpp/csp/adapters/parquet/ArrowSingleColumnArrayBuilder.h
Comment thread cpp/csp/adapters/parquet/ParquetInputAdapterManager.h Outdated
Comment thread cpp/csp/adapters/parquet/ParquetInputAdapterManager.cpp Outdated
Comment thread cpp/csp/adapters/parquet/ParquetInputAdapterManager.cpp Outdated
Comment thread cpp/csp/adapters/arrow/ColumnDispatcher.cpp
Comment thread cpp/csp/adapters/arrow/RecordBatchRowProcessor.cpp Outdated
Comment thread cpp/csp/adapters/arrow/RecordBatchRowProcessor.cpp Outdated
Comment thread cpp/csp/adapters/parquet/ParquetInputAdapterManager.cpp Outdated
Comment thread csp/adapters/parquet.py
Comment thread cpp/csp/adapters/arrow/ArrowFieldReader.h Outdated
…tring lambdas

- Remove unused m_basketSymbolColumn member
- Remove dead properties.get line before CSP_THROW
- Hoist phase variable out of loop
- Use generic lambda for string/binary extraction
- Leave timeUnitMultiplier inline because constexpr fails with CSP_THROW under this C++20 build

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
…logic, hoist columns

- Make SourceEntry private in RecordBatchRowProcessor
- Make m_arrowTypeId and m_columnName const
- Replace duplicated first-batch loop with fetchNextBatch call
- Hoist columns() out of rebindSource loop

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Comment thread cpp/csp/adapters/arrow/ColumnDispatcher.cpp Outdated
…rride

- Make viewToString lambda static in createColumnDispatcher
- Remove m_defaultTimezone member, validate tz inline and discard
- Remove redundant doReadNextValue override from LambdaReader (base class handles it)

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra force-pushed the ac/parquet_input_adapter branch from 1336cda to c72a9d8 Compare May 13, 2026 17:49
Add PrefetchingRecordBatchReader that decodes the next RecordBatch on a
background thread while CSP processes the current batch. Also enable
Arrow's use_threads and pre_buffer for parallel column decoding and IO
range caching.

The PrefetchingRecordBatchReader co-owns the FileReader (via shared_ptr)
to guarantee the FileReader outlives the background prefetch thread,
even when CSP stops mid-file.

Benchmarks show ~15% average speedup, up to 1.5x on filtered reads and
wide structs, with no regressions.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra force-pushed the ac/parquet_input_adapter branch from 930c0b8 to 662da22 Compare May 19, 2026 17:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

adapter: parquet Issues and PRs related to our Apache Parquet/Arrow adapter type: feature Issues and PRs related to new features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants