From 85bd7df6a87d391784559272378bff491a211adb Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sat, 9 May 2026 20:15:46 +0300 Subject: [PATCH 1/7] [client] Add client.scanner.kv.fetch.max-bytes config option --- .../java/org/apache/fluss/config/ConfigOptions.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 02e016852f..35223a49fd 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1272,6 +1272,17 @@ public class ConfigOptions { + CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME.key() + " time to return."); + public static final ConfigOption CLIENT_SCANNER_KV_FETCH_MAX_BYTES = + key("client.scanner.kv.fetch.max-bytes") + .memoryType() + .defaultValue(MemorySize.parse("4mb")) + .withDescription( + "The maximum amount of data the server should return per kv scan request when " + + "performing a full primary key table scan. Records are streamed in batches; " + + "the server may cap this value via '" + + KV_SCANNER_MAX_BATCH_SIZE.key() + + "'."); + public static final ConfigOption CLIENT_LOOKUP_QUEUE_SIZE = key("client.lookup.queue-size") .intType() From 0f8ffa2014ed2902674720b621263cf29fe19451 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sat, 9 May 2026 20:56:15 +0300 Subject: [PATCH 2/7] [client] Introduce KvBatchScanner for full PK table scans --- .../table/scanner/batch/KvBatchScanner.java | 345 ++++++++++++++ .../scanner/batch/KvBatchScannerTest.java | 430 ++++++++++++++++++ 2 files changed, 775 insertions(+) create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java create mode 100644 fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerTest.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java new file mode 100644 index 0000000000..0245295aed --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.exception.LeaderNotAvailableException; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.record.ValueRecord; +import org.apache.fluss.record.ValueRecordReadContext; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.PbScanReqForBucket; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.SchemaUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A {@link BatchScanner} that streams every live row of a single primary-key bucket from the tablet + * server's RocksDB state via a sequence of {@code ScanKv} RPCs. The scan has snapshot isolation: + * rows reflect the KV state at the moment the server opened the snapshot; concurrent writes after + * that point are invisible. + * + *

The next continuation RPC is issued immediately after each response so the caller's + * deserialise/process work overlaps the network round-trip; one in-flight RPC at a time. + */ +@Internal +@NotThreadSafe +public final class KvBatchScanner implements BatchScanner { + + private static final Logger LOG = LoggerFactory.getLogger(KvBatchScanner.class); + + @VisibleForTesting static final int MAX_OPEN_RETRIES = 3; + @VisibleForTesting static final long BASE_RETRY_DELAY_MS = 100L; + + private final TablePath tablePath; + private final TableBucket bucket; + private final SchemaGetter schemaGetter; + private final MetadataUpdater metadataUpdater; + private final int targetSchemaId; + private final int batchSizeBytes; + @Nullable private final int[] projectedColumns; + private final Map schemaMappingCache = new HashMap<>(); + private final ValueRecordReadContext readContext; + + @Nullable private TabletServerGateway gateway; + @Nullable private byte[] scannerId; + @Nullable private CompletableFuture inFlight; + + private int callSeqId = 0; + private int openRetries = 0; + private boolean drained = false; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public KvBatchScanner( + TableInfo tableInfo, + TableBucket tableBucket, + SchemaGetter schemaGetter, + MetadataUpdater metadataUpdater, + int batchSizeBytes, + @Nullable int[] projectedColumns) { + this.tablePath = tableInfo.getTablePath(); + this.bucket = tableBucket; + this.schemaGetter = schemaGetter; + this.metadataUpdater = metadataUpdater; + this.targetSchemaId = tableInfo.getSchemaId(); + this.batchSizeBytes = Math.max(1, batchSizeBytes); + this.projectedColumns = projectedColumns; + this.readContext = + ValueRecordReadContext.createReadContext( + schemaGetter, tableInfo.getTableConfig().getKvFormat()); + } + + @Nullable + @Override + public CloseableIterator pollBatch(Duration timeout) throws IOException { + if (closed.get() || drained) { + return null; + } + if (inFlight == null) { + try { + openScanner(); + } catch (Exception e) { + terminate(); + throw new IOException("Failed to open scanner for bucket " + bucket, e); + } + } + + final ScanKvResponse response; + try { + response = inFlight.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + return CloseableIterator.emptyIterator(); + } catch (Exception e) { + terminate(); + throw new IOException(e); + } + inFlight = null; + + if (response.hasErrorCode() && response.getErrorCode() != 0) { + return handleErrorResponse(response); + } + + if (response.hasScannerId()) { + scannerId = response.getScannerId(); + } + + boolean hasMore = response.hasHasMoreResults() && response.isHasMoreResults(); + if (hasMore) { + sendContinuation(); + } else { + drained = true; + } + + if (!response.hasRecords()) { + return drained ? null : CloseableIterator.emptyIterator(); + } + return CloseableIterator.wrap(parseRecords(response).iterator()); + } + + @Override + public void close() throws IOException { + terminate(); + } + + private void openScanner() { + metadataUpdater.checkAndUpdateMetadata(tablePath, bucket); + int leader = metadataUpdater.leaderFor(tablePath, bucket); + gateway = metadataUpdater.newTabletServerClientForNode(leader); + if (gateway == null) { + throw new LeaderNotAvailableException( + "Leader for bucket " + bucket + " is not available. Please retry the scan."); + } + + PbScanReqForBucket bucketReq = + new PbScanReqForBucket() + .setTableId(bucket.getTableId()) + .setBucketId(bucket.getBucket()); + if (bucket.getPartitionId() != null) { + bucketReq.setPartitionId(bucket.getPartitionId()); + } + + ScanKvRequest request = + new ScanKvRequest().setBucketScanReq(bucketReq).setBatchSizeBytes(batchSizeBytes); + inFlight = gateway.scanKv(request); + } + + private void sendContinuation() { + // Pre-increment so the first continuation sends call_seq_id=1; the server initialises its + // counter to 0 and expects requestSeqId == contextSeqId + 1. + ScanKvRequest request = + new ScanKvRequest() + .setScannerId(scannerId) + .setBatchSizeBytes(batchSizeBytes) + .setCallSeqId(++callSeqId); + inFlight = gateway.scanKv(request); + } + + private CloseableIterator handleErrorResponse(ScanKvResponse response) + throws IOException { + Errors error = Errors.forCode(response.getErrorCode()); + String message = response.hasErrorMessage() ? response.getErrorMessage() : null; + + if (error == Errors.TOO_MANY_SCANNERS + && scannerId == null + && openRetries < MAX_OPEN_RETRIES) { + long delayMs = BASE_RETRY_DELAY_MS * (1L << openRetries); + openRetries++; + LOG.debug( + "Too many scanners for bucket {}; retrying open in {} ms (attempt {}/{}).", + bucket, + delayMs, + openRetries, + MAX_OPEN_RETRIES); + try { + Thread.sleep(delayMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + terminate(); + throw new IOException( + "Interrupted while backing off before scanner open retry for bucket " + + bucket, + ie); + } + try { + openScanner(); + } catch (Exception e) { + terminate(); + throw new IOException( + "Failed to open scanner for bucket " + bucket + " on retry", e); + } + return CloseableIterator.emptyIterator(); + } + + // Refresh metadata so the next caller-driven attempt resolves the new leader. Auto- + // restarting here would silently swap the RocksDB snapshot, breaking snapshot isolation. + if (error == Errors.NOT_LEADER_OR_FOLLOWER) { + try { + metadataUpdater.checkAndUpdateMetadata(tablePath, bucket); + } catch (Exception refreshFailure) { + LOG.debug( + "Metadata refresh after NotLeaderOrFollower failed for bucket {}.", + bucket, + refreshFailure); + } + terminate(); + throw new IOException(error.exception(message)); + } + + // The server-side session is already gone; skip close_scanner. + if (error == Errors.SCANNER_EXPIRED || error == Errors.UNKNOWN_SCANNER_ID) { + scannerId = null; + closed.set(true); + drained = true; + throw new IOException(error.exception(message)); + } + + terminate(); + throw new IOException(error.exception(message)); + } + + private void terminate() { + if (!closed.compareAndSet(false, true)) { + return; + } + if (inFlight != null) { + inFlight.cancel(true); + inFlight = null; + } + sendBestEffortClose(); + } + + private void sendBestEffortClose() { + // After natural EOF the server already closed the session; sending close_scanner is + // unnecessary and would be a stale request. + if (scannerId == null || gateway == null || drained) { + return; + } + try { + gateway.scanKv( + new ScanKvRequest() + .setScannerId(scannerId) + .setBatchSizeBytes(batchSizeBytes) + .setCloseScanner(true)) + .whenComplete( + (resp, ex) -> { + if (ex != null) { + LOG.debug( + "close_scanner RPC failed for scanner of bucket {};" + + " server-side TTL will reclaim resources.", + bucket, + ex); + } + }); + } catch (Throwable t) { + LOG.debug("close_scanner RPC dispatch failed for bucket {}.", bucket, t); + } + } + + private List parseRecords(ScanKvResponse response) { + ByteBuffer recordsBuffer = ByteBuffer.wrap(response.getRecords()); + DefaultValueRecordBatch valueRecords = + DefaultValueRecordBatch.pointToByteBuffer(recordsBuffer); + int recordCount = valueRecords.getRecordCount(); + if (recordCount == 0) { + return Collections.emptyList(); + } + List rows = new ArrayList<>(recordCount); + for (ValueRecord record : valueRecords.records(readContext)) { + InternalRow row = record.getRow(); + if (targetSchemaId != record.schemaId()) { + int[] indexMapping = + schemaMappingCache.computeIfAbsent( + record.schemaId(), + sourceSchemaId -> + SchemaUtil.getIndexMapping( + schemaGetter.getSchema(sourceSchemaId), + schemaGetter.getSchema(targetSchemaId))); + row = ProjectedRow.from(indexMapping).replaceRow(row); + } + if (projectedColumns != null) { + row = ProjectedRow.from(projectedColumns).replaceRow(row); + } + rows.add(row); + } + return rows; + } + + @VisibleForTesting + boolean isClosed() { + return closed.get(); + } + + @VisibleForTesting + boolean isDrained() { + return drained; + } + + @VisibleForTesting + int openRetries() { + return openRetries; + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerTest.java new file mode 100644 index 0000000000..75f60e664e --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerTest.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.cluster.Cluster; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.InvalidScanRequestException; +import org.apache.fluss.exception.NotLeaderOrFollowerException; +import org.apache.fluss.exception.ScannerExpiredException; +import org.apache.fluss.exception.TooManyScannersException; +import org.apache.fluss.exception.UnknownScannerIdException; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.TestingSchemaGetter; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.rpc.TestingTabletGatewayService; +import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK; +import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Protocol-level unit tests for {@link KvBatchScanner} against a recording fake gateway. */ +class KvBatchScannerTest { + + private static final TableBucket BUCKET_0 = new TableBucket(DATA1_TABLE_ID_PK, 0); + private static final byte[] SCANNER_ID = new byte[] {1, 2, 3, 4}; + private static final Duration POLL_TIMEOUT = Duration.ofSeconds(5); + private static final SchemaGetter SCHEMA_GETTER = + new TestingSchemaGetter((short) 1, DATA1_SCHEMA_PK); + + @Test + void firstPollOpensScannerAndDoesNotIncludeCallSeqId() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyTerminalResponse(SCANNER_ID)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + assertThat(scanner.pollBatch(POLL_TIMEOUT)).isNull(); + + ScanKvRequest open = gateway.requests.get(0); + assertThat(open.hasBucketScanReq()).isTrue(); + assertThat(open.hasScannerId()).isFalse(); + assertThat(open.hasCallSeqId()).isFalse(); + assertThat(open.getBucketScanReq().getTableId()).isEqualTo(DATA1_TABLE_ID_PK); + assertThat(open.getBucketScanReq().getBucketId()).isEqualTo(0); + } + } + + @Test + void continuationsUsePreIncrementedCallSeqIdStartingAtOne() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(emptyTerminalResponse(SCANNER_ID)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + for (int i = 0; i < 4; i++) { + scanner.pollBatch(POLL_TIMEOUT); + } + } + + assertThat(gateway.requests).hasSize(4); + assertThat(gateway.requests.get(0).hasCallSeqId()).isFalse(); + assertThat(gateway.requests.get(1).getCallSeqId()).isEqualTo(1); + assertThat(gateway.requests.get(2).getCallSeqId()).isEqualTo(2); + assertThat(gateway.requests.get(3).getCallSeqId()).isEqualTo(3); + } + + @Test + void continuationsCarryScannerIdFromFirstResponse() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(emptyTerminalResponse(SCANNER_ID)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + scanner.pollBatch(POLL_TIMEOUT); + scanner.pollBatch(POLL_TIMEOUT); + } + + assertThat(gateway.requests.get(1).hasScannerId()).isTrue(); + assertThat(gateway.requests.get(1).getScannerId()).isEqualTo(SCANNER_ID); + } + + @Test + void emptyBucketReturnsNullOnFirstPoll() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyTerminalResponse(SCANNER_ID)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + assertThat(scanner.pollBatch(POLL_TIMEOUT)).isNull(); + assertThat(scanner.pollBatch(POLL_TIMEOUT)).isNull(); + } + + assertThat(gateway.requests).hasSize(1); + } + + @Test + void pipelinesNextRequestImmediatelyAfterResponse() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(emptyTerminalResponse(SCANNER_ID)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + scanner.pollBatch(POLL_TIMEOUT); + assertThat(gateway.requests).hasSize(2); + } + } + + // ------------------------------------------------------------------------- + // close() discipline + // ------------------------------------------------------------------------- + + @Test + void closeIsIdempotent() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(emptyTerminalResponse(SCANNER_ID)); + + KvBatchScanner scanner = newScanner(gateway); + scanner.pollBatch(POLL_TIMEOUT); + scanner.close(); + int after1 = gateway.requests.size(); + scanner.close(); + scanner.close(); + assertThat(gateway.requests).hasSize(after1); + assertThat(scanner.isClosed()).isTrue(); + } + + @Test + void closeAfterDrainedDoesNotSendCloseScanner() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyTerminalResponse(SCANNER_ID)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + assertThat(scanner.pollBatch(POLL_TIMEOUT)).isNull(); + assertThat(scanner.isDrained()).isTrue(); + } + + assertThat(gateway.requests).hasSize(1); + assertThat(gateway.requests.stream().anyMatch(KvBatchScannerTest::isCloseRequest)) + .isFalse(); + } + + @Test + void closeMidStreamSendsCloseScannerWithScannerId() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + + KvBatchScanner scanner = newScanner(gateway); + scanner.pollBatch(POLL_TIMEOUT); + scanner.close(); + + // open + pipelined continuation + close_scanner + assertThat(gateway.requests).hasSize(3); + ScanKvRequest closeReq = gateway.requests.get(2); + assertThat(isCloseRequest(closeReq)).isTrue(); + assertThat(closeReq.getScannerId()).isEqualTo(SCANNER_ID); + } + + // ------------------------------------------------------------------------- + // TOO_MANY_SCANNERS retry + // ------------------------------------------------------------------------- + + @Test + void tooManyScannersOnOpenRetriesUpToLimitThenSucceeds() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(errorResponse(Errors.TOO_MANY_SCANNERS)); + gateway.enqueue(errorResponse(Errors.TOO_MANY_SCANNERS)); + gateway.enqueue(errorResponse(Errors.TOO_MANY_SCANNERS)); + gateway.enqueue(emptyTerminalResponse(SCANNER_ID)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + assertThat(scanner.pollBatch(POLL_TIMEOUT)) + .isNotNull() + .satisfies(it -> assertThat(it.hasNext()).isFalse()); + assertThat(scanner.pollBatch(POLL_TIMEOUT)) + .isNotNull() + .satisfies(it -> assertThat(it.hasNext()).isFalse()); + assertThat(scanner.pollBatch(POLL_TIMEOUT)) + .isNotNull() + .satisfies(it -> assertThat(it.hasNext()).isFalse()); + assertThat(scanner.pollBatch(POLL_TIMEOUT)).isNull(); + assertThat(scanner.openRetries()).isEqualTo(3); + } + } + + @Test + void tooManyScannersOnOpenExhaustsRetriesAndFails() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(errorResponse(Errors.TOO_MANY_SCANNERS)); + gateway.enqueue(errorResponse(Errors.TOO_MANY_SCANNERS)); + gateway.enqueue(errorResponse(Errors.TOO_MANY_SCANNERS)); + gateway.enqueue(errorResponse(Errors.TOO_MANY_SCANNERS)); + + KvBatchScanner scanner = newScanner(gateway); + scanner.pollBatch(POLL_TIMEOUT); + scanner.pollBatch(POLL_TIMEOUT); + scanner.pollBatch(POLL_TIMEOUT); + + assertThatThrownBy(() -> scanner.pollBatch(POLL_TIMEOUT)) + .isInstanceOf(IOException.class) + .hasCauseInstanceOf(TooManyScannersException.class); + assertThat(scanner.isClosed()).isTrue(); + } + + // ------------------------------------------------------------------------- + // Other terminal errors + // ------------------------------------------------------------------------- + + @Test + void notLeaderOrFollowerRefreshesMetadataAndFails() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(errorResponse(Errors.NOT_LEADER_OR_FOLLOWER)); + + TestMetadataUpdater meta = new TestMetadataUpdater(gateway); + try (KvBatchScanner scanner = + new KvBatchScanner( + DATA1_TABLE_INFO_PK, BUCKET_0, SCHEMA_GETTER, meta, 4096, null)) { + assertThatThrownBy(() -> scanner.pollBatch(POLL_TIMEOUT)) + .isInstanceOf(IOException.class) + .hasCauseInstanceOf(NotLeaderOrFollowerException.class); + + // open + post-error refresh + assertThat(meta.metadataRefreshes.get()).isEqualTo(2); + assertThat(scanner.isClosed()).isTrue(); + } + } + + @Test + void scannerExpiredIsTerminalAndDoesNotSendCloseScanner() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(errorResponse(Errors.SCANNER_EXPIRED)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + scanner.pollBatch(POLL_TIMEOUT); + assertThatThrownBy(() -> scanner.pollBatch(POLL_TIMEOUT)) + .isInstanceOf(IOException.class) + .hasCauseInstanceOf(ScannerExpiredException.class); + } + + assertThat(gateway.requests.stream().anyMatch(KvBatchScannerTest::isCloseRequest)) + .isFalse(); + } + + @Test + void unknownScannerIdIsTerminalAndDoesNotSendCloseScanner() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(errorResponse(Errors.UNKNOWN_SCANNER_ID)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + scanner.pollBatch(POLL_TIMEOUT); + assertThatThrownBy(() -> scanner.pollBatch(POLL_TIMEOUT)) + .isInstanceOf(IOException.class) + .hasCauseInstanceOf(UnknownScannerIdException.class); + } + + assertThat(gateway.requests.stream().anyMatch(KvBatchScannerTest::isCloseRequest)) + .isFalse(); + } + + @Test + void invalidScanRequestIsTerminalAndSendsCloseScanner() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(emptyContinuationResponse(SCANNER_ID)); + gateway.enqueue(errorResponse(Errors.INVALID_SCAN_REQUEST)); + + try (KvBatchScanner scanner = newScanner(gateway)) { + scanner.pollBatch(POLL_TIMEOUT); + assertThatThrownBy(() -> scanner.pollBatch(POLL_TIMEOUT)) + .isInstanceOf(IOException.class) + .hasCauseInstanceOf(InvalidScanRequestException.class); + } + + assertThat(gateway.requests.stream().anyMatch(KvBatchScannerTest::isCloseRequest)).isTrue(); + } + + // ------------------------------------------------------------------------- + // Timeout + // ------------------------------------------------------------------------- + + @Test + void timeoutReturnsEmptyIteratorAndKeepsFutureInFlight() throws Exception { + RecordingGateway gateway = new RecordingGateway(); + gateway.enqueue(neverCompleting()); + + KvBatchScanner scanner = newScanner(gateway); + CloseableIterator first = scanner.pollBatch(Duration.ofMillis(50)); + assertThat(first).isNotNull(); + assertThat(first.hasNext()).isFalse(); + + assertThat(gateway.requests).hasSize(1); + scanner.close(); + } + + // ------------------------------------------------------------------------- + // Test helpers + // ------------------------------------------------------------------------- + + private KvBatchScanner newScanner(RecordingGateway gateway) { + return new KvBatchScanner( + DATA1_TABLE_INFO_PK, + BUCKET_0, + SCHEMA_GETTER, + new TestMetadataUpdater(gateway), + 4096, + null); + } + + private static ScanKvResponse emptyContinuationResponse(byte[] scannerId) { + return new ScanKvResponse().setScannerId(scannerId).setHasMoreResults(true); + } + + private static ScanKvResponse emptyTerminalResponse(byte[] scannerId) { + return new ScanKvResponse() + .setScannerId(scannerId) + .setHasMoreResults(false) + .setLogOffset(0L); + } + + private static ScanKvResponse errorResponse(Errors error) { + return new ScanKvResponse() + .setErrorCode(error.code()) + .setErrorMessage(error.exception().getMessage()); + } + + private static CompletableFuture neverCompleting() { + return new CompletableFuture<>(); + } + + private static boolean isCloseRequest(ScanKvRequest req) { + return req.hasCloseScanner() && req.isCloseScanner(); + } + + private static final class RecordingGateway extends TestingTabletGatewayService { + final List requests = new ArrayList<>(); + private final Queue> queued = new LinkedList<>(); + + void enqueue(ScanKvResponse response) { + queued.add(CompletableFuture.completedFuture(response)); + } + + void enqueue(CompletableFuture future) { + queued.add(future); + } + + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + requests.add(request); + if (request.hasCloseScanner() && request.isCloseScanner()) { + return CompletableFuture.completedFuture( + new ScanKvResponse().setHasMoreResults(false)); + } + CompletableFuture next = queued.poll(); + if (next == null) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally( + new AssertionError( + "RecordingGateway received an unexpected request (no response queued): " + + request)); + return failed; + } + return next; + } + } + + private static final class TestMetadataUpdater extends MetadataUpdater { + private final TabletServerGateway gateway; + final AtomicInteger metadataRefreshes = new AtomicInteger(); + + TestMetadataUpdater(TabletServerGateway gateway) { + super(null, new Configuration(), Cluster.empty()); + this.gateway = gateway; + } + + @Override + public void checkAndUpdateMetadata(TablePath tablePath, TableBucket tableBucket) { + metadataRefreshes.incrementAndGet(); + } + + @Override + public int leaderFor(TablePath tablePath, TableBucket tableBucket) { + return 0; + } + + @Override + public @Nullable TabletServerGateway newTabletServerClientForNode(int serverId) { + return gateway; + } + } +} From 48278ceac4b3039028b41a803fc162995a9128ca Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sat, 9 May 2026 21:08:51 +0300 Subject: [PATCH 3/7] [client] Update Scan and TableScan --- .../fluss/client/table/scanner/Scan.java | 10 ++++-- .../fluss/client/table/scanner/TableScan.java | 32 ++++++++++++++++++- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java index 233d2d358d..11ab0aaa98 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java @@ -87,9 +87,10 @@ public interface Scan { TypedLogScanner createTypedLogScanner(Class pojoClass); /** - * Creates a {@link BatchScanner} to read current data in the given table bucket for this scan. + * Creates a {@link BatchScanner} to read current data in the given table bucket. * - *

Note: this API doesn't support pre-configured with {@link #project}. + *

For Primary Key Tables, this performs a full RocksDB-backed KV scan of the bucket and does + * not require {@link #limit(int)}. For Log Tables, {@link #limit(int)} must be set. */ BatchScanner createBatchScanner(TableBucket tableBucket); @@ -102,6 +103,9 @@ public interface Scan { */ BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId); - /** Creates a {@link BatchScanner} to read current data in the given table for this scan. */ + /** + * Creates a {@link BatchScanner} that scans across all buckets of the table, expanding all + * partitions for partitioned tables. For Log Tables, {@link #limit(int)} must be set. + */ BatchScanner createBatchScanner() throws IOException; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java index 6502b7946f..cff4ec327c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.metadata.KvSnapshotMetadata; import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.client.table.scanner.batch.CompositeBatchScanner; +import org.apache.fluss.client.table.scanner.batch.KvBatchScanner; import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner; import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; @@ -38,6 +39,9 @@ import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; @@ -47,6 +51,8 @@ /** API for configuring and creating {@link LogScanner} and {@link BatchScanner}. */ public class TableScan implements Scan { + private static final Logger LOG = LoggerFactory.getLogger(TableScan.class); + private final FlussConnection conn; private final TableInfo tableInfo; private final SchemaGetter schemaGetter; @@ -159,10 +165,19 @@ public BatchScanner createBatchScanner(TableBucket tableBucket) { "BatchScanner doesn't support filter pushdown. Table: %s, bucket: %s", tableInfo.getTablePath(), tableBucket)); } + if (tableInfo.hasPrimaryKey() && limit == null) { + return new KvBatchScanner( + tableInfo, + tableBucket, + schemaGetter, + conn.getMetadataUpdater(), + kvBatchSizeBytes(), + projectedColumns); + } if (limit == null) { throw new UnsupportedOperationException( String.format( - "Currently, BatchScanner is only available when limit is set. Table: %s, bucket: %s", + "BatchScanner over a Log Table requires limit to be set. Table: %s, bucket: %s", tableInfo.getTablePath(), tableBucket)); } return new LimitBatchScanner( @@ -174,6 +189,21 @@ public BatchScanner createBatchScanner(TableBucket tableBucket) { limit); } + private int kvBatchSizeBytes() { + long bytes = + conn.getConfiguration() + .get(ConfigOptions.CLIENT_SCANNER_KV_FETCH_MAX_BYTES) + .getBytes(); + if (bytes > Integer.MAX_VALUE) { + LOG.warn( + "{} ({} bytes) exceeds Integer.MAX_VALUE; capping at Integer.MAX_VALUE.", + ConfigOptions.CLIENT_SCANNER_KV_FETCH_MAX_BYTES.key(), + bytes); + return Integer.MAX_VALUE; + } + return (int) bytes; + } + @Override public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId) { if (recordBatchFilter != null) { From 2ba515e49040073908f70e64263237052cef1168 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sat, 9 May 2026 21:33:45 +0300 Subject: [PATCH 4/7] [client] add end2end integration tests - TableKvScanITCase --- .../fluss/client/table/TableKvScanITCase.java | 356 ++++++++++++++++++ 1 file changed, 356 insertions(+) create mode 100644 fluss-client/src/test/java/org/apache/fluss/client/table/TableKvScanITCase.java diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/TableKvScanITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/TableKvScanITCase.java new file mode 100644 index 0000000000..a1a305033b --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/TableKvScanITCase.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table; + +import org.apache.fluss.client.admin.ClientToServerITCaseBase; +import org.apache.fluss.client.table.scanner.batch.BatchScanUtils; +import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.fluss.testutils.DataTestUtils.compactedRow; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** End-to-end IT case for {@link org.apache.fluss.client.table.scanner.batch.KvBatchScanner}. */ +class TableKvScanITCase extends ClientToServerITCaseBase { + + private static final String DB = "test-kv-scan-db"; + + private static final Schema PK_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + + private static final RowType PK_ROW_TYPE = PK_SCHEMA.getRowType(); + + private static final int BUCKETS = 3; + + private static final TableDescriptor PK_TABLE = + TableDescriptor.builder().schema(PK_SCHEMA).distributedBy(BUCKETS, "id").build(); + + @Test + void wholeTableScanReturnsAllRows() throws Exception { + TablePath tablePath = TablePath.of(DB, "whole-table-scan"); + long tableId = createTable(tablePath, PK_TABLE, true); + waitAllReplicasReady(tableId, BUCKETS); + + try (Table table = conn.getTable(tablePath)) { + int rowCount = 50; + upsertRows(table, rowCount); + + try (BatchScanner scanner = table.newScan().createBatchScanner()) { + List rows = BatchScanUtils.collectRows(scanner); + assertThat(rows).hasSize(rowCount); + assertThat(idsOf(rows)).containsExactlyInAnyOrderElementsOf(rangeAsList(rowCount)); + } + } + } + + @Test + void singleBucketScanReturnsOnlyThatBucketsRows() throws Exception { + TablePath tablePath = TablePath.of(DB, "single-bucket-scan"); + long tableId = createTable(tablePath, PK_TABLE, true); + waitAllReplicasReady(tableId, BUCKETS); + + try (Table table = conn.getTable(tablePath)) { + int rowCount = 30; + upsertRows(table, rowCount); + + int bucketCount = table.getTableInfo().getNumBuckets(); + int totalAcrossBuckets = 0; + for (int b = 0; b < bucketCount; b++) { + try (BatchScanner scanner = + table.newScan().createBatchScanner(new TableBucket(tableId, b))) { + totalAcrossBuckets += BatchScanUtils.collectRows(scanner).size(); + } + } + assertThat(totalAcrossBuckets).isEqualTo(rowCount); + } + } + + @Test + void emptyTableReturnsNoRows() throws Exception { + TablePath tablePath = TablePath.of(DB, "empty-table-scan"); + long tableId = createTable(tablePath, PK_TABLE, true); + waitAllReplicasReady(tableId, BUCKETS); + + try (Table table = conn.getTable(tablePath); + BatchScanner scanner = table.newScan().createBatchScanner()) { + assertThat(BatchScanUtils.collectRows(scanner)).isEmpty(); + } + } + + @Test + void snapshotIsolationHidesPostOpenWrites() throws Exception { + // Single bucket so the first poll opens *the* bucket scanner and pins its snapshot; + // multi-bucket scanners open buckets lazily, which would race the post-open write. + TablePath tablePath = TablePath.of(DB, "snapshot-isolation-scan"); + TableDescriptor singleBucket = + TableDescriptor.builder().schema(PK_SCHEMA).distributedBy(1, "id").build(); + long tableId = createTable(tablePath, singleBucket, true); + waitAllReplicasReady(tableId, 1); + + try (Table table = conn.getTable(tablePath)) { + int initialRows = 20; + upsertRows(table, initialRows); + + try (BatchScanner scanner = table.newScan().createBatchScanner()) { + List firstBatch = drainOnePoll(scanner); + + upsertRow(table, initialRows, "post-snapshot"); + + List remaining = BatchScanUtils.collectRows(scanner); + List all = new ArrayList<>(firstBatch); + all.addAll(remaining); + + assertThat(all).hasSize(initialRows); + assertThat(idsOf(all)).doesNotContain(initialRows); + } + } + } + + @Test + void projectionAppliedClientSide() throws Exception { + TablePath tablePath = TablePath.of(DB, "projection-scan"); + long tableId = createTable(tablePath, PK_TABLE, true); + waitAllReplicasReady(tableId, BUCKETS); + + try (Table table = conn.getTable(tablePath)) { + int rowCount = 10; + upsertRows(table, rowCount); + + try (BatchScanner scanner = + table.newScan().project(new int[] {1}).createBatchScanner()) { + List projected = BatchScanUtils.collectRows(scanner); + assertThat(projected).hasSize(rowCount); + Set names = new HashSet<>(); + for (InternalRow row : projected) { + assertThat(row.getFieldCount()).isEqualTo(1); + names.add(row.getString(0).toString()); + } + assertThat(names).hasSize(rowCount); + } + } + } + + @Test + void logTableWithoutLimitFails() throws Exception { + TablePath tablePath = TablePath.of(DB, "log-table-needs-limit"); + Schema logSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .build(); + TableDescriptor logTable = + TableDescriptor.builder().schema(logSchema).distributedBy(1).build(); + createTable(tablePath, logTable, true); + + try (Table table = conn.getTable(tablePath)) { + assertThatThrownBy(() -> table.newScan().createBatchScanner()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Log Table"); + } + } + + @Test + void partitionedPrimaryKeyTableScanCoversAllPartitions() throws Exception { + TablePath tablePath = TablePath.of(DB, "partitioned-pk-scan"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.STRING()) + .primaryKey("a", "c") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .partitionedBy("c") + .distributedBy(2, "a") + .build(); + createTable(tablePath, descriptor, false); + + for (int i = 0; i < 3; i++) { + admin.createPartition(tablePath, newPartitionSpec("c", "p" + i), false).get(); + } + List partitions = admin.listPartitionInfos(tablePath).get(); + assertThat(partitions).hasSize(3); + + int rowsPerPartition = 4; + try (Table table = conn.getTable(tablePath)) { + UpsertWriter writer = table.newUpsert().createWriter(); + for (PartitionInfo partition : partitions) { + String name = partition.getPartitionName(); + for (int j = 0; j < rowsPerPartition; j++) { + writer.upsert( + compactedRow(schema.getRowType(), new Object[] {j, "v" + j, name})); + } + } + writer.flush(); + + try (BatchScanner scanner = table.newScan().createBatchScanner()) { + List rows = BatchScanUtils.collectRows(scanner); + assertThat(rows).hasSize(partitions.size() * rowsPerPartition); + Set partitionsSeen = new HashSet<>(); + for (InternalRow row : rows) { + partitionsSeen.add(row.getString(2).toString()); + } + assertThat(partitionsSeen) + .containsExactlyInAnyOrderElementsOf( + partitions.stream() + .map(PartitionInfo::getPartitionName) + .collect(Collectors.toSet())); + } + } + } + + @Test + void partitionedTablePerBucketScansCoverAllPartitionBuckets() throws Exception { + TablePath tablePath = TablePath.of(DB, "partitioned-per-bucket-scan"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.STRING()) + .primaryKey("a", "c") + .build(); + int bucketsPerPartition = 2; + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .partitionedBy("c") + .distributedBy(bucketsPerPartition, "a") + .build(); + long tableId = createTable(tablePath, descriptor, false); + + for (int i = 0; i < 3; i++) { + admin.createPartition(tablePath, newPartitionSpec("c", "p" + i), false).get(); + } + List partitions = admin.listPartitionInfos(tablePath).get(); + assertThat(partitions).hasSize(3); + + int rowsPerPartition = 8; + try (Table table = conn.getTable(tablePath)) { + UpsertWriter writer = table.newUpsert().createWriter(); + for (PartitionInfo partition : partitions) { + String name = partition.getPartitionName(); + for (int j = 0; j < rowsPerPartition; j++) { + writer.upsert( + compactedRow(schema.getRowType(), new Object[] {j, "v" + j, name})); + } + } + writer.flush(); + + // Iterate every (partition, bucket) combination and sum per-partition rows. + for (PartitionInfo partition : partitions) { + long partitionId = partition.getPartitionId(); + int rowsForThisPartition = 0; + for (int b = 0; b < bucketsPerPartition; b++) { + TableBucket tb = new TableBucket(tableId, partitionId, b); + try (BatchScanner scanner = table.newScan().createBatchScanner(tb)) { + List bucketRows = BatchScanUtils.collectRows(scanner); + for (InternalRow row : bucketRows) { + assertThat(row.getString(2).toString()) + .isEqualTo(partition.getPartitionName()); + } + rowsForThisPartition += bucketRows.size(); + } + } + assertThat(rowsForThisPartition).isEqualTo(rowsPerPartition); + } + } + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private static void upsertRows(Table table, int count) throws Exception { + UpsertWriter writer = table.newUpsert().createWriter(); + for (int i = 0; i < count; i++) { + writer.upsert(compactedRow(PK_ROW_TYPE, new Object[] {i, "v" + i})); + } + writer.flush(); + } + + private static void upsertRow(Table table, int id, String name) throws Exception { + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(compactedRow(PK_ROW_TYPE, new Object[] {id, name})); + writer.flush(); + } + + /** Polls until at least one non-empty batch is consumed, or the scanner is drained. */ + private static List drainOnePoll(BatchScanner scanner) throws Exception { + Duration timeout = Duration.ofSeconds(10); + long deadlineNanos = System.nanoTime() + timeout.toNanos(); + while (System.nanoTime() < deadlineNanos) { + CloseableIterator it = scanner.pollBatch(Duration.ofMillis(200)); + if (it == null) { + return new ArrayList<>(); + } + try { + List rows = new ArrayList<>(); + while (it.hasNext()) { + rows.add(it.next()); + } + if (!rows.isEmpty()) { + return rows; + } + } finally { + it.close(); + } + } + throw new AssertionError("scanner produced no batch within " + timeout); + } + + private static List idsOf(List rows) { + List ids = new ArrayList<>(rows.size()); + for (InternalRow row : rows) { + ids.add(row.getInt(0)); + } + return ids; + } + + private static List rangeAsList(int endExclusive) { + List out = new ArrayList<>(endExclusive); + for (int i = 0; i < endExclusive; i++) { + out.add(i); + } + return out; + } +} From e7e55bf31c8cb2effc58d7f4936d9d9d8d421658 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sat, 9 May 2026 21:48:23 +0300 Subject: [PATCH 5/7] [docs] Document full PK table scan --- .../fluss/client/table/TableKvScanITCase.java | 27 +++++++++ website/docs/apis/java-client.md | 56 +++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/TableKvScanITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/TableKvScanITCase.java index a1a305033b..3161a053f0 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/TableKvScanITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/TableKvScanITCase.java @@ -35,6 +35,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -168,6 +169,32 @@ void projectionAppliedClientSide() throws Exception { } } + @Test + void projectionByColumnNamesAppliedClientSide() throws Exception { + TablePath tablePath = TablePath.of(DB, "projection-by-name-scan"); + long tableId = createTable(tablePath, PK_TABLE, true); + waitAllReplicasReady(tableId, BUCKETS); + + try (Table table = conn.getTable(tablePath)) { + int rowCount = 10; + upsertRows(table, rowCount); + + try (BatchScanner scanner = + table.newScan() + .project(Collections.singletonList("name")) + .createBatchScanner()) { + List projected = BatchScanUtils.collectRows(scanner); + assertThat(projected).hasSize(rowCount); + Set names = new HashSet<>(); + for (InternalRow row : projected) { + assertThat(row.getFieldCount()).isEqualTo(1); + names.add(row.getString(0).toString()); + } + assertThat(names).hasSize(rowCount); + } + } + } + @Test void logTableWithoutLimitFails() throws Exception { TablePath tablePath = TablePath.of(DB, "log-table-needs-limit"); diff --git a/website/docs/apis/java-client.md b/website/docs/apis/java-client.md index aa5c75e9d6..4447fe55b8 100644 --- a/website/docs/apis/java-client.md +++ b/website/docs/apis/java-client.md @@ -243,6 +243,62 @@ while (true) { } ``` +### Batch Scan — Full Primary Key Table + +For Primary Key tables, `BatchScanner` reads every live row in the table once +and stops. The scan is served from a point-in-time RocksDB snapshot on the +tablet server, so all rows reflect the KV state at the moment the scan was +opened — concurrent writes are invisible to a running scan. + +This is the building block for periodic full-state reads such as: + +- **Dashboards / materialised views** — refresh a derived view by re-reading + the entire current state of a primary key table on a schedule. +- **Cache or search-index warm-up** — load every row of a PK table into an + external system at startup, with a consistent snapshot guarantee. +- **Bulk export to OLAP / data lake** — periodic full-snapshot of the table + for downstream analytics, complementing the streaming changelog read via + `LogScanner`. + +`createBatchScanner()` (no arguments) scans the whole table; for partitioned +tables it expands across all partitions × buckets automatically. Use +`createBatchScanner(TableBucket)` to scan a single bucket — useful when +distributing scan work across workers in a parallel engine. + +```java +try (BatchScanner scanner = table.newScan().createBatchScanner()) { + while (true) { + CloseableIterator batch = scanner.pollBatch(Duration.ofSeconds(5)); + if (batch == null) { + break; // end of scan + } + try { + while (batch.hasNext()) { + InternalRow row = batch.next(); + // process row + } + } finally { + batch.close(); + } + } +} +``` + +You can also restrict the columns returned via `project(...)`: + +```java +try (BatchScanner scanner = table.newScan() + .project(new int[] {0, 2}) // or .project(Arrays.asList("id", "status")) + .createBatchScanner()) { + // ... +} +``` + +The per-RPC payload size is controlled by +`client.scanner.kv.fetch.max-bytes` (default `4mb`); the server caps this +further via `kv.scanner.max-batch-size`. Note that `BatchScanner` is not +thread-safe — do not share an instance across threads. + ### Lookup You can also use the Fluss API to perform lookups on a table. This is useful for querying specific records based on their primary key or prefix key. ```java From 51cbd59c3e2ea8ef567a3ddbdcff7ecb0f909c56 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sun, 10 May 2026 08:37:39 +0300 Subject: [PATCH 6/7] [client] remove redundant logging --- .../fluss/client/table/scanner/TableScan.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java index cff4ec327c..51f12abc7c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java @@ -39,9 +39,6 @@ import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.IOException; @@ -51,7 +48,6 @@ /** API for configuring and creating {@link LogScanner} and {@link BatchScanner}. */ public class TableScan implements Scan { - private static final Logger LOG = LoggerFactory.getLogger(TableScan.class); private final FlussConnection conn; private final TableInfo tableInfo; @@ -190,18 +186,10 @@ public BatchScanner createBatchScanner(TableBucket tableBucket) { } private int kvBatchSizeBytes() { - long bytes = + return (int) conn.getConfiguration() .get(ConfigOptions.CLIENT_SCANNER_KV_FETCH_MAX_BYTES) .getBytes(); - if (bytes > Integer.MAX_VALUE) { - LOG.warn( - "{} ({} bytes) exceeds Integer.MAX_VALUE; capping at Integer.MAX_VALUE.", - ConfigOptions.CLIENT_SCANNER_KV_FETCH_MAX_BYTES.key(), - bytes); - return Integer.MAX_VALUE; - } - return (int) bytes; } @Override From f2f01a7b2770d6e216d9c07b381a55aaf9d24f9d Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sun, 10 May 2026 08:42:33 +0300 Subject: [PATCH 7/7] address co-pilot comments --- .../table/scanner/batch/KvBatchScanner.java | 4 ++-- website/docs/apis/java-client.md | 21 ++++++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java index 0245295aed..5a99d88b21 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java @@ -64,7 +64,7 @@ * that point are invisible. * *

The next continuation RPC is issued immediately after each response so the caller's - * deserialise/process work overlaps the network round-trip; one in-flight RPC at a time. + * deserialize/process work overlaps the network round-trip; one in-flight RPC at a time. */ @Internal @NotThreadSafe @@ -188,7 +188,7 @@ private void openScanner() { } private void sendContinuation() { - // Pre-increment so the first continuation sends call_seq_id=1; the server initialises its + // Pre-increment so the first continuation sends call_seq_id=1; the server initializes its // counter to 0 and expects requestSeqId == contextSeqId + 1. ScanKvRequest request = new ScanKvRequest() diff --git a/website/docs/apis/java-client.md b/website/docs/apis/java-client.md index 4447fe55b8..eeb169f815 100644 --- a/website/docs/apis/java-client.md +++ b/website/docs/apis/java-client.md @@ -246,16 +246,27 @@ while (true) { ### Batch Scan — Full Primary Key Table For Primary Key tables, `BatchScanner` reads every live row in the table once -and stops. The scan is served from a point-in-time RocksDB snapshot on the -tablet server, so all rows reflect the KV state at the moment the scan was -opened — concurrent writes are invisible to a running scan. +and stops. Each bucket is served from a point-in-time RocksDB snapshot on its +tablet server: rows from a given bucket reflect the KV state at the moment +that bucket's scan was opened, and writes to that bucket after the open are +invisible to the running scan. + +:::caution Snapshot boundary is per-bucket +For multi-bucket and partitioned tables, `createBatchScanner()` opens each +bucket's snapshot independently the first time it is polled. There is **no +single global table-wide point-in-time view** across all buckets — concurrent +writes can land in buckets whose scanner has not yet opened. If you need a +strict table-wide snapshot, use the named-snapshot path: +`createBatchScanner(TableBucket, long snapshotId)` against a snapshot id +returned by `Admin#getLatestKvSnapshots(...)`. +::: This is the building block for periodic full-state reads such as: -- **Dashboards / materialised views** — refresh a derived view by re-reading +- **Dashboards / materialized views** — refresh a derived view by re-reading the entire current state of a primary key table on a schedule. - **Cache or search-index warm-up** — load every row of a PK table into an - external system at startup, with a consistent snapshot guarantee. + external system at startup, with a per-bucket consistent snapshot guarantee. - **Bulk export to OLAP / data lake** — periodic full-snapshot of the table for downstream analytics, complementing the streaming changelog read via `LogScanner`.