Skip to content

[AURON #1851] Introduce Arrow to Flink RowData reader#2063

Open
weiqingy wants to merge 8 commits intoapache:masterfrom
weiqingy:wiyang/AURON-1851-arrow-reader
Open

[AURON #1851] Introduce Arrow to Flink RowData reader#2063
weiqingy wants to merge 8 commits intoapache:masterfrom
weiqingy:wiyang/AURON-1851-arrow-reader

Conversation

@weiqingy
Copy link

@weiqingy weiqingy commented Mar 3, 2026

Which issue does this PR close?

Closes #1851

Rationale for this change

Per AIP-1, the Flink integration data path requires converting Arrow vectors returned by the native engine (DataFusion/Rust) back into Flink RowData so downstream Flink operators can process results.

What changes are included in this PR?

  • FlinkArrowReader orchestrator — zero-copy columnar access via ColumnarRowData + VectorizedColumnBatch
  • 16 ArrowXxxColumnVector wrappers for all 17 supported types
  • Decimal fromUnscaledLong optimization for precision ≤ 18
  • Batch reset support for streaming pipelines
  • 21 unit tests in FlinkArrowReaderTest

Are there any user-facing changes?

No. Internal API for Flink integration.

How was this patch tested?

21 tests: ./build/mvn test -pl auron-flink-extension/auron-flink-runtime -am -Pscala-2.12 -Pflink-1.18 -Pspark-3.5 -DskipBuildNative -Dtest=FlinkArrowReaderTest
Result: 21 pass, 0 failures.

@github-actions github-actions bot added documentation Improvements or additions to documentation flink labels Mar 3, 2026
@weiqingy weiqingy force-pushed the wiyang/AURON-1851-arrow-reader branch from ee74ab4 to d53cf08 Compare March 3, 2026 04:45
weiqingy added 5 commits March 2, 2026 20:51
Add 7 ColumnVector wrapper classes for primitive types (Boolean, TinyInt,
SmallInt, Int, BigInt, Float, Double) that implement Flink's ColumnVector
sub-interfaces backed by Arrow FieldVectors for zero-copy columnar access.
…r wrappers

Add VarChar, VarBinary, and Decimal wrappers. Decimal uses compact
fromUnscaledLong path for precision <= 18 to avoid BigDecimal allocation,
reading directly from Arrow's data buffer via ArrowBuf.getLong().
Add Date (epoch days), Time (micros->millis conversion), and Timestamp
(micros->TimestampData with millis + nanoOfMillisecond) wrappers.
Timestamp wrapper uses TimeStampVector parent to handle both TIMESTAMP
and TIMESTAMP_LTZ Arrow vectors.
Add Array (ListVector), Map (MapVector), and Row (StructVector) wrappers
with recursive child vector wrapping. Array/Map use offset buffers for
element access; Row uses VectorizedColumnBatch with reusable ColumnarRowData.
FlinkArrowReader creates ColumnVector wrappers from VectorSchemaRoot +
RowType, provides read(int) returning ColumnarRowData, getRowCount(),
reset(VectorSchemaRoot) for batch reuse, and AutoCloseable. Dispatches
LogicalTypeRoot to the correct wrapper via switch on all 17 supported types.
@weiqingy weiqingy force-pushed the wiyang/AURON-1851-arrow-reader branch 2 times, most recently from b3d5562 to a1be6f6 Compare March 3, 2026 04:53
@weiqingy
Copy link
Author

weiqingy commented Mar 3, 2026

@Tartarus0zm Could you please take a look at the PR when you have a chance?

I split it into multiple commits and added a review_help.md file to make the review easier. Once the review is complete, I’ll remove the helper file.

Thank you!

@Tartarus0zm
Copy link
Contributor

@weiqingy thanks for your contribute, waiting for CI green

@Tartarus0zm Tartarus0zm self-requested a review March 3, 2026 06:22
21 test methods covering all 17 supported types with values and null
handling, plus integration tests for multi-column batch, empty batch,
reader reset, and unsupported type exception.
@weiqingy weiqingy force-pushed the wiyang/AURON-1851-arrow-reader branch from a1be6f6 to ffcf6d6 Compare March 3, 2026 06:46
@weiqingy
Copy link
Author

weiqingy commented Mar 3, 2026

@weiqingy thanks for your contribute, waiting for CI green

Thank you @Tartarus0zm! I just pushed a fix - the CI failure was due to List.of() which requires Java 9+, while CI runs on JDK 8. Replaced with Collections.singletonList() in the test file. I force-pushed to keep the commit history clean for reviewers, so CI may need another workflow approval. Let me know if anything else is needed!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements the Arrow-to-Flink RowData conversion layer (AURON #1851), the reverse of the Flink-to-Arrow writer from PR #1930. It enables native engine results (in Arrow format) to be read back into Flink's RowData for downstream operator processing, completing the second half of the Flink-native engine round-trip defined in AIP-1.

Changes:

  • 16 ArrowXxxColumnVector wrapper classes providing zero-copy delegation from Arrow FieldVector types to Flink ColumnVector sub-interfaces
  • FlinkArrowReader orchestrator that maps Arrow fields to the correct column vector wrappers via LogicalType, providing read(int), getRowCount(), and reset(VectorSchemaRoot) APIs
  • FlinkArrowReaderTest with 21 unit and integration tests covering all 17 supported types and batch-reset behavior

Reviewed changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
FlinkArrowReader.java Orchestrator: creates wrappers from VectorSchemaRoot + RowType, returns ColumnarRowData via read(), supports batch reset()
ArrowBooleanColumnVector.java Wraps BitVectorBooleanColumnVector (!= 0 conversion)
ArrowTinyIntColumnVector.java Wraps TinyIntVectorByteColumnVector
ArrowSmallIntColumnVector.java Wraps SmallIntVectorShortColumnVector
ArrowIntColumnVector.java Wraps IntVectorIntColumnVector
ArrowBigIntColumnVector.java Wraps BigIntVectorLongColumnVector
ArrowFloatColumnVector.java Wraps Float4VectorFloatColumnVector
ArrowDoubleColumnVector.java Wraps Float8VectorDoubleColumnVector
ArrowDecimalColumnVector.java Wraps DecimalVectorDecimalColumnVector with compact/wide path optimization
ArrowVarCharColumnVector.java Wraps VarCharVectorBytesColumnVector
ArrowVarBinaryColumnVector.java Wraps VarBinaryVectorBytesColumnVector
ArrowDateColumnVector.java Wraps DateDayVectorIntColumnVector (epoch days)
ArrowTimeColumnVector.java Wraps TimeMicroVectorIntColumnVector (micros÷1000)
ArrowTimestampColumnVector.java Wraps TimeStampVectorTimestampColumnVector (micros→millis+nanos)
ArrowArrayColumnVector.java Wraps ListVectorArrayColumnVector using offset buffer
ArrowMapColumnVector.java Wraps MapVectorMapColumnVector using key/value child vectors
ArrowRowColumnVector.java Wraps StructVectorRowColumnVector with recursive child vectors
FlinkArrowReaderTest.java 21 tests covering all 17 types, nulls, edge cases, multi-column, reset, and unsupported types
AURON-1851-DESIGN.md Design document covering motivation, approach comparison, type table, and Q&A
docs/reviewhelper/01-06-*.md Commit-by-commit review guides for the 6 commits in this PR

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +212 to +220
case TIME_WITHOUT_TIME_ZONE:
// The writer (FlinkArrowFieldWriter) normalizes all TIME values to microseconds
// in a TimeMicroVector, regardless of the declared Flink TIME precision.
return new ArrowTimeColumnVector((TimeMicroVector) vector);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
// The writer normalizes all timestamps to microseconds. TimeStampVector is the
// common parent of TimeStampMicroVector and TimeStampMicroTZVector.
return new ArrowTimestampColumnVector((TimeStampVector) vector);
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The createColumnVector method for TIME_WITHOUT_TIME_ZONE unconditionally casts the Arrow vector to TimeMicroVector, but FlinkArrowUtils.toArrowType() maps TIME to different Arrow time units based on precision:

  • TIME(0) → TimeSecVector (not TimeMicroVector) → ClassCastException at runtime
  • TIME(1-3) → TimeMilliVector (not TimeMicroVector) → ClassCastException at runtime
  • TIME(4-6) → TimeMicroVector ← only this precision range works correctly
  • TIME(7+) → TimeNanoVector (not TimeMicroVector) → ClassCastException at runtime

For TIMESTAMP_WITHOUT_TIME_ZONE/TIMESTAMP_WITH_LOCAL_TIME_ZONE, the cast to TimeStampVector will not fail (it is the parent class of all timestamp vectors), but the conversion micros / 1000 and % 1000 is only correct for microsecond-precision vectors. For other precisions:

  • TIMESTAMP(0) → TimeStampSecVector: vector.get(i) returns seconds, but is treated as microseconds → result is off by 10^6
  • TIMESTAMP(1-3) → TimeStampMilliVector: treated as microseconds → off by 10^3
  • TIMESTAMP(7+) → TimeStampNanoVector: treated as microseconds → off by 10^3 (nanos ÷ 1000 for millis, nanos % 1000 * 1000 for nano)

The createColumnVector method needs to dispatch on the TIME/TIMESTAMP precision (via the LogicalType's precision) and apply the correct unit conversion and vector cast for each precision range, consistent with FlinkArrowUtils.toArrowType(). The comment claiming "writer normalizes all TIME/TIMESTAMP values to microseconds regardless of declared precision" is incorrect — FlinkArrowUtils.toArrowType() selects the Arrow time unit based on precision.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the FlinkArrowUtils.toArrowType() mapping. The reader is designed as the inverse of the
writer (PR #1930), which per the design discussion normalizes all TIME/TIMESTAMP values to microsecond
precision in Arrow, regardless of the declared Flink precision. This is documented in the design doc
(section 3.3) and in the code comments on the TIME/TIMESTAMP switch cases.

That said, this coupling to the writer's normalization behavior is worth highlighting. If the writer's
approach changes to preserve precision-dependent time units, the reader would need to be updated to
match. Will need to confirm with @x-tong on PR #1930 that the writer does indeed normalize to microseconds.

Comment on lines +68 to +72
long millis = micros / 1000;
// micros % 1000 yields the sub-millisecond remainder in microseconds; * 1000 converts to nanos.
// For negative micros (pre-epoch), Java's truncation-toward-zero produces a negative
// remainder, which is consistent with the writer's inverse conversion.
int nanoOfMillisecond = ((int) (micros % 1000)) * 1000;
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For pre-epoch timestamps (negative micros values), Java's truncation-toward-zero in the conversion nanoOfMillisecond = ((int)(micros % 1000)) * 1000 produces a negative nanoOfMillisecond. For example, with micros = -1500: millis = -1, nanoOfMillisecond = -500000.

TimestampData.fromEpochMillis documents that nanoOfMillisecond must be in the range [0, 999999]. Passing a negative value creates a TimestampData that violates this contract and may produce incorrect behavior in downstream Flink operators that rely on this invariant.

The correct approach for pre-epoch timestamps is to use floor division and adjust the remainder to be non-negative. For micros = -1500: millis = Math.floorDiv(-1500, 1000) = -2, nanoOfMillisecond = (int)(Math.floorMod(-1500, 1000)) * 1000 = 500000.

Suggested change
long millis = micros / 1000;
// micros % 1000 yields the sub-millisecond remainder in microseconds; * 1000 converts to nanos.
// For negative micros (pre-epoch), Java's truncation-toward-zero produces a negative
// remainder, which is consistent with the writer's inverse conversion.
int nanoOfMillisecond = ((int) (micros % 1000)) * 1000;
// Use floor-based division so that for negative micros (pre-epoch), the remainder is
// non-negative and nanoOfMillisecond stays within [0, 999_999], as required by
// TimestampData.fromEpochMillis.
long millis = Math.floorDiv(micros, 1000);
int nanoOfMillisecond = (int) Math.floorMod(micros, 1000) * 1000;

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Updated the PR. Changed to Math.floorDiv / Math.floorMod so
nanoOfMillisecond stays within [0, 999_999] for pre-epoch timestamps. Added a test for micros = -1500
verifying millis = -2, nanoOfMillisecond = 500_000.

Comment on lines +53 to +62

/**
* Replaces the underlying Arrow vector. Used during reader reset to point at a new batch
* without allocating a new wrapper.
*
* @param vector the new Arrow vector, must not be null
*/
void setVector(BitVector vector) {
this.vector = Preconditions.checkNotNull(vector);
}
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setVector() method in ArrowBooleanColumnVector (and all other ArrowXxxColumnVector wrapper classes) is dead code. The FlinkArrowReader.reset() method does not call setVector() on the existing wrappers — instead it creates entirely new wrapper instances via createColumnVector(). Since these package-private methods are not called from anywhere and have no external callers, they add maintenance overhead without serving any purpose. They should either be removed, or FlinkArrowReader.reset() should be updated to use them (which would avoid the extra wrapper allocations per batch reset).

Suggested change
/**
* Replaces the underlying Arrow vector. Used during reader reset to point at a new batch
* without allocating a new wrapper.
*
* @param vector the new Arrow vector, must not be null
*/
void setVector(BitVector vector) {
this.vector = Preconditions.checkNotNull(vector);
}

Copilot uses AI. Check for mistakes.
Copy link
Author

@weiqingy weiqingy Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct observation — FlinkArrowReader.reset() currently recreates wrappers rather than calling
setVector(). The methods were added intentionally for a future optimization: in a streaming pipeline
processing many batches with the same schema, reusing wrapper instances via setVector() avoids per-batch allocations. This is a minor cost (one method per wrapper class) that keeps the optimization path
available. Happy to remove if reviewers prefer the leaner approach.

Comment on lines +364 to +387
@Test
public void testTimeVector() {
try (BufferAllocator allocator =
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTime", 0, Long.MAX_VALUE)) {
TimeMicroVector vec = new TimeMicroVector("col", allocator);
vec.allocateNew(3);
vec.setSafe(0, 45_296_000_000L); // 45296000000 micros -> 45296000 millis
vec.setNull(1);
vec.setSafe(2, 0L);
vec.setValueCount(3);

VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec));
RowType rowType = RowType.of(new TimeType(6));
FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);

// ArrowTimeColumnVector divides micros by 1000 to get millis
assertEquals(45_296_000, reader.read(0).getInt(0));
assertTrue(reader.read(1).isNullAt(0));
assertEquals(0, reader.read(2).getInt(0));

reader.close();
root.close();
}
}
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test only covers TIME(6) and TIMESTAMP(6), which are the only precisions that happen to work correctly with the current reader implementation. Tests for TIME(0), TIME(3), and TIME(9) (and their TIMESTAMP equivalents) are missing. Adding such tests would immediately expose the critical ClassCastException / incorrect unit conversion bug in FlinkArrowReader.createColumnVector for TIME/TIMESTAMP of non-microsecond precision.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests cover microsecond precision because that's what the writer (PR #1930) produces. The reader is
the inverse of the writer, which normalizes all TIME/TIMESTAMP to microseconds. Testing other precisions
would require vectors that the writer never generates. If the writer's normalization approach changes,
both the reader and tests would be updated together.

Comment on lines +136 to +158
The writer (PR #1930) normalizes all timestamps to **microsecond** precision in Arrow. The reader must reverse this:

```
Writer path: TimestampData → microseconds (long) stored in TimeStampMicroVector
Reader path: TimeStampMicroVector → microseconds (long) → TimestampData

Conversion:
long micros = vector.get(i);
long millis = micros / 1000;
int nanoOfMillisecond = (int) (micros % 1000) * 1000;
return TimestampData.fromEpochMillis(millis, nanoOfMillisecond);
```

Similarly for Time:
```
Writer path: int millis → micros (long) stored in TimeMicroVector
Reader path: TimeMicroVector → micros (long) → millis (int)

Conversion:
return (int) (vector.get(i) / 1000);
```

This matches the writer's conversions in `FlinkArrowFieldWriter.TimestampWriter` and `FlinkArrowFieldWriter.TimeWriter`.
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design document claims "The writer (PR #1930) normalizes all timestamps to microsecond precision in Arrow" and makes the same claim for TIME types. However, FlinkArrowUtils.toArrowType() (which is the existing type-mapping utility used by the writer) maps TIME and TIMESTAMP to different Arrow time units based on the declared Flink precision (SECOND for precision 0, MILLISECOND for 1-3, MICROSECOND for 4-6, NANOSECOND for 7+). The design document's claim is inaccurate and should be corrected to reflect the actual precision-dependent mapping.

Copilot uses AI. Check for mistakes.
Copy link
Author

@weiqingy weiqingy Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlinkArrowUtils.toArrowType() defines the Arrow schema type metadata, but the writer's FlinkArrowFieldWriter controls the actual data normalization. These are separate concerns — the schema
metadata says "this is a Timestamp(MICROSECOND)" and the writer ensures the values are indeed in
microseconds. The design doc's claim refers to the writer's data normalization, not toArrowType(). Will
confirm with @x-tong on PR #1930.

@Tartarus0zm
Copy link
Contributor

@weiqingy Given the large amount of code, I'll still need some time.

…rDiv/floorMod

Use Math.floorDiv and Math.floorMod instead of / and % to ensure
nanoOfMillisecond stays within [0, 999_999] for negative micros
(pre-epoch timestamps). Add test for negative timestamp values.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation flink

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce Arrow to Flink RowData

3 participants