From 927825acd6cfd13ee3b0d7e9565f40cf3d930d9a Mon Sep 17 00:00:00 2001 From: matrix Date: Sun, 10 May 2026 15:26:05 +0800 Subject: [PATCH] [flink] Fix batch fallback generating mixed split types for primary-key tables --- .../enumerator/FlinkSourceEnumerator.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 1a6521f57e..1b4f096052 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -377,9 +377,12 @@ private void startInBatchMode() { p.getPartitionId(), p.getPartitionName())) .collect(Collectors.toList()); - splits = this.initPartitionedSplits(partitions); + // Use log-only splits to avoid generating mixed split + // types (HybridSnapshotLogSplit + LogSplit) for + // primary-key tables, which is not supported. + splits = this.initLogTablePartitionSplits(partitions); } else { - splits = this.initNonPartitionedSplits(); + splits = this.getLogSplit(null, null); } } return splits; @@ -734,18 +737,21 @@ private List getLogSplit( } if (!bucketsNeedInitOffset.isEmpty()) { - startingOffsetsInitializer - .getBucketOffsets(partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever) - .forEach( - (bucketId, startingOffset) -> - splits.add( - new LogSplit( - new TableBucket( - tableInfo.getTableId(), - partitionId, - bucketId), - partitionName, - startingOffset))); + Map startingOffsets = + startingOffsetsInitializer.getBucketOffsets( + partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever); + Map stoppingOffsets = + stoppingOffsetsInitializer.getBucketOffsets( + partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever); + startingOffsets.forEach( + (bucketId, startingOffset) -> + splits.add( + new LogSplit( + new TableBucket( + tableInfo.getTableId(), partitionId, bucketId), + partitionName, + startingOffset, + stoppingOffsets.get(bucketId)))); } return splits; }