Rewrite the parquet input adapter manager#704
Open
arhamchopra wants to merge 15 commits into
Open
Conversation
a40d7a1 to
bc4b134
Compare
bc4b134 to
5122477
Compare
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
…iguity The introduction of namespace csp::adapters::arrow (for the new ColumnDispatcher/RecordBatchRowProcessor classes) creates ambiguity when writer-side headers use unqualified arrow:: inside namespace csp::adapters::parquet. The compiler finds the sibling csp::adapters::arrow namespace before the global ::arrow namespace. Also forward-declares ColumnDispatcher and RecordBatchRowProcessor in ParquetInputAdapterManager.h (moving full includes to .cpp) and adds direct includes for csp/core/Exception.h and arrow/table.h that were previously provided transitively through the now-deleted reader headers. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
86d9f43 to
2ac4688
Compare
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
…tring lambdas - Remove unused m_basketSymbolColumn member - Remove dead properties.get line before CSP_THROW - Hoist phase variable out of loop - Use generic lambda for string/binary extraction - Leave timeUnitMultiplier inline because constexpr fails with CSP_THROW under this C++20 build Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
…logic, hoist columns - Make SourceEntry private in RecordBatchRowProcessor - Make m_arrowTypeId and m_columnName const - Replace duplicated first-batch loop with fetchNextBatch call - Hoist columns() out of rebindSource loop Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
…rride - Make viewToString lambda static in createColumnDispatcher - Remove m_defaultTimezone member, validate tz inline and discard - Remove redundant doReadNextValue override from LambdaReader (base class handles it) Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
1336cda to
c72a9d8
Compare
Add PrefetchingRecordBatchReader that decodes the next RecordBatch on a background thread while CSP processes the current batch. Also enable Arrow's use_threads and pre_buffer for parallel column decoding and IO range caching. The PrefetchingRecordBatchReader co-owns the FileReader (via shared_ptr) to guarantee the FileReader outlives the background prefetch thread, even when CSP stops mid-file. Benchmarks show ~15% average speedup, up to 1.5x on filtered reads and wide structs, with no regressions. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
930c0b8 to
662da22
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rewrite the parquet input adapter for RecordBatch-based streaming
Replaces the old
ParquetReader/ParquetReaderColumnAdapterclass hierarchy with a new three-layer architecture that operates on ArrowRecordBatchdata directly. The new design is simpler (fewer virtual calls, no per-type reader subclasses) and supports reading from parquet files, Arrow IPC streams, and in-memory Arrow Tables through a unifiedRecordBatchStreamSourceinterface.Motivation
The old implementation had:
FileReaderWrapper→ParquetFileReaderWrapper, per-typeReaderColumnAdaptersubclasses) that was hard to extendRecordBatchReaderfrom external sources)The new implementation:
RecordBatchStreamSourceinterface that cleanly separates file management from row processingArchitecture
RecordBatchStreamSource(new interface) abstracts file iteration with two implementations:NativeParquetStreamSource— C++ opens parquet files directly with leaf-level column projectionPyRecordBatchStreamSource— Python yieldsRecordBatchReaderobjects via Arrow C Stream Interface (IPC, memory tables)RecordBatchRowProcessor(new) binds to NRecordBatchReader*(one per split-column file), providesreadRowAndAdvance()/skipRow()/dispatchRow(). Validates split-column alignment at runtime.ColumnDispatcher(new) is a type-erased wrapper combiningFieldReader+ value storage +ValueDispatcher, one per subscribed column.ParquetInputAdapterManageris rewritten to orchestrate via the above layers. It no longer touches Arrow arrays directly.What's removed
ParquetReader/ParquetReaderColumnAdapter(~2500 lines) — the old per-type reader class hierarchyFileReaderWrapper/ParquetFileReaderWrapper/ArrowIPCFileReaderWrapper— old file abstractionsDialectGenericListReaderInterface— unused reader interfacem_rbSourcesmember inDictBasketReaderRecord(declared/cleared but never populated)Bug fixes
countLeafColumns()to correctly expand struct fields into all their leaf indices.allow_missing_columns=True, the adapter now throws a clearRuntimeErrorinstead of segfaulting.Performance
Benchmarked on Linux (Python 3.13, Arrow 23.0.1, GCC 14.3) across 73 scenarios covering scale, types, read patterns, row groups, sources, and access patterns. Each scenario runs 5 iterations with 2 warmup runs; results validated with coefficient-of-variation trust indicators.
API compatibility
The public Python API (
ParquetReader.subscribe,subscribe_all,subscribe_dict_basket) is unchanged. All 128 existing + new tests pass (covering all Arrow types, null handling, struct projection, split columns, dict baskets, multi-file, IPC, and in-memory tables).