From 4724f9ead19aede3024bf36fdb13a1cc0fd608e8 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Thu, 26 Mar 2026 11:38:57 +0800 Subject: [PATCH 1/5] [lake/tiering] add bytes written tracking and metrics for tiering service --- .../table/scanner/log/LogFetchCollector.java | 4 +- .../client/table/scanner/log/LogFetcher.java | 3 +- .../table/scanner/log/LogScannerImpl.java | 21 +++--- .../scanner/log/LogFetchCollectorTest.java | 18 ++--- .../table/scanner/log/LogFetcherITCase.java | 26 +++---- .../apache/fluss/lake/writer/LakeWriter.java | 9 +++ .../org/apache/fluss/metrics/MetricNames.java | 8 +++ .../tiering/source/TieringSourceReader.java | 25 +++++-- .../tiering/source/TieringSplitReader.java | 13 +++- .../source/metrics/TieringMetrics.java | 72 +++++++++++++++++++ .../iceberg/tiering/IcebergLakeWriter.java | 12 ++++ .../lake/paimon/tiering/PaimonLakeWriter.java | 15 ++++ 12 files changed, 179 insertions(+), 47 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java index 34a3ec86e6..e690eed8a1 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java @@ -83,7 +83,7 @@ public LogFetchCollector( * @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE */ - public Map> collectFetch(final LogFetchBuffer logFetchBuffer) { + public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { Map> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; @@ -143,7 +143,7 @@ public Map> collectFetch(final LogFetchBuffer logF } } - return fetched; + return new ScanRecords(fetched); } private List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index b5a03a1caf..c8f87984b0 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -22,7 +22,6 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.table.scanner.RemoteFileDownloader; -import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.cluster.BucketLocation; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -161,7 +160,7 @@ public boolean hasAvailableFetches() { return !logFetchBuffer.isEmpty(); } - public Map> collectFetch() { + public ScanRecords collectFetch() { return logFetchCollector.collectFetch(logFetchBuffer); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index 33181bd7ae..9c24dbeb77 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -21,7 +21,6 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.table.scanner.RemoteFileDownloader; -import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.WakeupException; import org.apache.fluss.metadata.SchemaGetter; @@ -41,8 +40,6 @@ import java.time.Duration; import java.util.Collections; import java.util.ConcurrentModificationException; -import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -141,17 +138,17 @@ public ScanRecords poll(Duration timeout) { long timeoutNanos = timeout.toNanos(); long startNanos = System.nanoTime(); do { - Map> fetchRecords = pollForFetches(); - if (fetchRecords.isEmpty()) { + ScanRecords scanRecords = pollForFetches(); + if (scanRecords.isEmpty()) { try { if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) { // logFetcher waits for the timeout and no data in buffer, // so we return empty - return new ScanRecords(fetchRecords); + return scanRecords; } } catch (WakeupException e) { // wakeup() is called, we need to return empty - return new ScanRecords(fetchRecords); + return scanRecords; } } else { // before returning the fetched records, we can send off the next round of @@ -159,7 +156,7 @@ public ScanRecords poll(Duration timeout) { // while the user is handling the fetched records. logFetcher.sendFetches(); - return new ScanRecords(fetchRecords); + return scanRecords; } } while (System.nanoTime() - startNanos < timeoutNanos); @@ -247,10 +244,10 @@ public void wakeup() { logFetcher.wakeup(); } - private Map> pollForFetches() { - Map> fetchedRecords = logFetcher.collectFetch(); - if (!fetchedRecords.isEmpty()) { - return fetchedRecords; + private ScanRecords pollForFetches() { + ScanRecords scanRecords = logFetcher.collectFetch(); + if (!scanRecords.isEmpty()) { + return scanRecords; } // send any new fetches (won't resend pending fetches). diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java index 99a108e2f3..471da0747a 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java @@ -19,7 +19,6 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metadata.TestingMetadataUpdater; -import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.config.Configuration; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.record.LogRecordReadContext; @@ -31,7 +30,6 @@ import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.apache.fluss.record.TestData.DATA1; @@ -97,10 +95,9 @@ void testNormal() throws Exception { assertThat(completedFetch.isInitialized()).isFalse(); // Fetch the data and validate that we get all the records we want back. - Map> bucketAndRecords = - logFetchCollector.collectFetch(logFetchBuffer); - assertThat(bucketAndRecords.size()).isEqualTo(1); - assertThat(bucketAndRecords.get(tb)).size().isEqualTo(10); + ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); + assertThat(bucketAndRecords.buckets().size()).isEqualTo(1); + assertThat(bucketAndRecords.records(tb).size()).isEqualTo(10); // When we collected the data from the buffer, this will cause the completed fetch to get // initialized. @@ -122,7 +119,7 @@ void testNormal() throws Exception { // Now attempt to collect more records from the fetch buffer. bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); - assertThat(bucketAndRecords.size()).isEqualTo(0); + assertThat(bucketAndRecords.buckets().size()).isEqualTo(0); } @Test @@ -147,14 +144,13 @@ void testCollectAfterUnassign() throws Exception { // unassign bucket 2 logScannerStatus.unassignScanBuckets(Collections.singletonList(tb2)); - Map> bucketAndRecords = - logFetchCollector.collectFetch(logFetchBuffer); + ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); // should only contain records for bucket 1 - assertThat(bucketAndRecords.keySet()).containsExactly(tb1); + assertThat(bucketAndRecords.buckets()).containsExactly(tb1); // collect again, should be empty bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); - assertThat(bucketAndRecords.size()).isEqualTo(0); + assertThat(bucketAndRecords.buckets().size()).isEqualTo(0); } private DefaultCompletedFetch makeCompletedFetch( diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java index 73c9b40fb6..dca64fb542 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java @@ -147,9 +147,9 @@ void testFetchWithSchemaChange() throws Exception { assertThat(logFetcher.hasAvailableFetches()).isTrue(); assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2); }); - Map> records = logFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(1); - List scanRecords = records.get(tb0); + ScanRecords records = logFetcher.collectFetch(); + assertThat(records.buckets().size()).isEqualTo(1); + List scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) .isEqualTo(expectedRows); @@ -193,9 +193,9 @@ void testFetchWithSchemaChange() throws Exception { assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2); }); records = newSchemaLogFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(1); - assertThat(records.get(tb0)).hasSize(20); - scanRecords = records.get(tb0); + assertThat(records.buckets().size()).isEqualTo(1); + assertThat(records.records(tb0)).hasSize(20); + scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) .isEqualTo(expectedRows); newSchemaLogFetcher.close(); @@ -226,10 +226,10 @@ void testFetch() throws Exception { assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2); }); - Map> records = logFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(2); - assertThat(records.get(tb0).size()).isEqualTo(10); - assertThat(records.get(tb1).size()).isEqualTo(10); + ScanRecords records = logFetcher.collectFetch(); + assertThat(records.buckets().size()).isEqualTo(2); + assertThat(records.records(tb0).size()).isEqualTo(10); + assertThat(records.records(tb1).size()).isEqualTo(10); // after collect fetch, the fetcher is empty. assertThat(logFetcher.hasAvailableFetches()).isFalse(); @@ -297,9 +297,9 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception { assertThat(logFetcher.hasAvailableFetches()).isTrue(); assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1); }); - Map> records = logFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(1); - assertThat(records.get(tb0).size()).isEqualTo(10); + ScanRecords records = logFetcher.collectFetch(); + assertThat(records.buckets().size()).isEqualTo(1); + assertThat(records.records(tb0).size()).isEqualTo(10); } @Test diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java index c3ab3a352a..972949cb7b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java @@ -47,4 +47,13 @@ public interface LakeWriter extends Closeable { * @throws IOException if an I/O error occurs */ WriteResult complete() throws IOException; + + /** + * Returns the total bytes written to the lake storage in this write session. Computed from + * actual physical file sizes after {@link #complete()} is called. Returns -1 by default for + * backward compatibility with third-party implementations. + */ + default long getBytesWritten() { + return -1L; + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index b11c3b92fc..e5baff6d70 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -280,4 +280,12 @@ public class MetricNames { public static final String NETTY_NUM_ALLOCATIONS_PER_SECONDS = "numAllocationsPerSecond"; public static final String NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS = "numHugeAllocationsPerSecond"; + + // -------------------------------------------------------------------------------------------- + // metrics for tiering service + // -------------------------------------------------------------------------------------------- + + // for lake tiering metrics - operator level + public static final String TIERING_SERVICE_WRITTEN_BYTES = "writtenBytes"; + public static final String TIERING_SERVICE_WRITTEN_BYTES_RATE = "writtenBytesPerSecond"; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index eae584efed..6f0fc43b95 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; @@ -73,17 +74,31 @@ public TieringSourceReader( Duration pollTimeout) { super( elementsQueue, - new TieringSourceFetcherManager<>( - elementsQueue, - () -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout), - context.getConfiguration(), - (ignore) -> {}), + createFetcherManager( + elementsQueue, context, connection, lakeTieringFactory, pollTimeout), new TableBucketWriteResultEmitter<>(), context.getConfiguration(), context); this.connection = connection; } + private static TieringSourceFetcherManager createFetcherManager( + FutureCompletingBlockingQueue>> + elementsQueue, + SourceReaderContext context, + Connection connection, + LakeTieringFactory lakeTieringFactory, + Duration pollTimeout) { + TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup()); + return new TieringSourceFetcherManager<>( + elementsQueue, + () -> + new TieringSplitReader<>( + connection, lakeTieringFactory, pollTimeout, tieringMetrics), + context.getConfiguration(), + (ignore) -> {}); + } + @Override public void start() { // we request a split only if we did not get splits during the checkpoint restore diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 1e530d69a3..0d37c80148 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -25,6 +25,7 @@ import org.apache.fluss.client.table.scanner.log.ScanRecords; import org.apache.fluss.flink.source.reader.BoundedSplitReader; import org.apache.fluss.flink.source.reader.RecordAndPos; +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; @@ -106,16 +107,20 @@ public class TieringSplitReader private final Set currentEmptySplits; + @Nullable private final TieringMetrics tieringMetrics; + + /** Creates a split reader without metrics. Use the 4-argument constructor in production. */ public TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory) { - this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT); + this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT, null); } @VisibleForTesting protected TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory, - Duration pollTimeout) { + Duration pollTimeout, + @Nullable TieringMetrics tieringMetrics) { this.lakeTieringFactory = lakeTieringFactory; // owned by TieringSourceReader this.connection = connection; @@ -129,6 +134,7 @@ protected TieringSplitReader( this.currentPendingSnapshotSplits = new ArrayDeque<>(); this.reachTieringMaxDurationTables = new HashSet<>(); this.pollTimeout = pollTimeout; + this.tieringMetrics = tieringMetrics; } @Override @@ -436,6 +442,9 @@ private TableBucketWriteResult completeLakeWriter( WriteResult writeResult = null; if (lakeWriter != null) { writeResult = lakeWriter.complete(); + if (tieringMetrics != null) { + tieringMetrics.recordBytesWritten(lakeWriter.getBytesWritten()); + } lakeWriter.close(); } return toTableBucketWriteResult( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java new file mode 100644 index 0000000000..c5672b1584 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source.metrics; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metrics.MetricNames; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; + +/** + * A collection class for handling metrics in Tiering source reader. + * + *

All metrics are registered under group "fluss.tieringService", which is a child group of + * {@link org.apache.flink.metrics.groups.OperatorMetricGroup}. + * + *

The following metrics are available: + * + *

    + *
  • {@code fluss.tieringService.readBytes} - Counter: cumulative bytes read from Fluss since + * the job started. + *
  • {@code fluss.tieringService.readBytesPerSecond} - Meter: bytes-per-second rate derived from + * the counter using a 60-second sliding window. + *
  • {@code fluss.tieringService.writtenBytes} - Counter: cumulative bytes written to the lake + * since the job started (physical file sizes after compression/encoding). + *
  • {@code fluss.tieringService.writtenBytesPerSecond} - Meter: write bytes-per-second rate + * derived from the counter using a 60-second sliding window. + *
+ */ +@Internal +public class TieringMetrics { + + // Metric group names + public static final String FLUSS_METRIC_GROUP = "fluss"; + public static final String TIERING_SERVICE_GROUP = "tieringService"; + + private final Counter writtenBytesCounter; + + public TieringMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) { + MetricGroup tieringJobGroup = + sourceReaderMetricGroup + .addGroup(FLUSS_METRIC_GROUP) + .addGroup(TIERING_SERVICE_GROUP); + + this.writtenBytesCounter = + tieringJobGroup.counter(MetricNames.TIERING_SERVICE_WRITTEN_BYTES); + tieringJobGroup.meter( + MetricNames.TIERING_SERVICE_WRITTEN_BYTES_RATE, new MeterView(writtenBytesCounter)); + } + + /** Records bytes written to the lake. Called once per bucket completion. */ + public void recordBytesWritten(long bytes) { + writtenBytesCounter.inc(bytes); + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java index 227112eae8..32c678ed05 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java @@ -28,6 +28,7 @@ import org.apache.fluss.record.LogRecord; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; +import org.apache.iceberg.DataFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; @@ -43,6 +44,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -59,6 +61,7 @@ public class IcebergLakeWriter implements LakeWriter { private final Catalog icebergCatalog; private final Table icebergTable; private final RecordWriter recordWriter; + private long bytesWritten = 0L; @Nullable private final ExecutorService compactionExecutor; @Nullable private CompletableFuture compactionFuture; @@ -111,6 +114,10 @@ public void write(LogRecord record) throws IOException { public IcebergWriteResult complete() throws IOException { try { WriteResult writeResult = recordWriter.complete(); + bytesWritten = + Arrays.stream(writeResult.dataFiles()) + .mapToLong(DataFile::fileSizeInBytes) + .sum(); RewriteDataFileResult rewriteDataFileResult = null; if (compactionFuture != null) { @@ -122,6 +129,11 @@ public IcebergWriteResult complete() throws IOException { } } + @Override + public long getBytesWritten() { + return bytesWritten; + } + @Override public void close() throws IOException { try { diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 8472b825b0..2b23991a94 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -26,8 +26,10 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import java.io.IOException; import java.util.Collections; @@ -41,6 +43,7 @@ public class PaimonLakeWriter implements LakeWriter { private final Catalog paimonCatalog; private final RecordWriter recordWriter; + private long bytesWritten = 0L; public PaimonLakeWriter( PaimonCatalogProvider paimonCatalogProvider, WriterInitContext writerInitContext) @@ -84,9 +87,21 @@ public PaimonWriteResult complete() throws IOException { } catch (Exception e) { throw new IOException("Failed to complete Paimon write.", e); } + if (commitMessage instanceof CommitMessageImpl) { + bytesWritten = + ((CommitMessageImpl) commitMessage) + .newFilesIncrement().newFiles().stream() + .mapToLong(DataFileMeta::fileSize) + .sum(); + } return new PaimonWriteResult(commitMessage); } + @Override + public long getBytesWritten() { + return bytesWritten; + } + @Override public void close() throws IOException { try { From a77b0a38ede2ef6a8b44e527d19bff1e0407dd1c Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Thu, 26 Mar 2026 13:55:21 +0800 Subject: [PATCH 2/5] remove nullable metrics and update metric group naming --- .../flink/tiering/source/TieringSplitReader.java | 15 +++++++-------- .../tiering/source/metrics/TieringMetrics.java | 12 ++++-------- .../tiering/source/TieringSplitReaderTest.java | 10 +++++++++- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 0d37c80148..68ef3e137e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -107,12 +107,13 @@ public class TieringSplitReader private final Set currentEmptySplits; - @Nullable private final TieringMetrics tieringMetrics; + private final TieringMetrics tieringMetrics; - /** Creates a split reader without metrics. Use the 4-argument constructor in production. */ public TieringSplitReader( - Connection connection, LakeTieringFactory lakeTieringFactory) { - this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT, null); + Connection connection, + LakeTieringFactory lakeTieringFactory, + TieringMetrics tieringMetrics) { + this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT, tieringMetrics); } @VisibleForTesting @@ -120,7 +121,7 @@ protected TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory, Duration pollTimeout, - @Nullable TieringMetrics tieringMetrics) { + TieringMetrics tieringMetrics) { this.lakeTieringFactory = lakeTieringFactory; // owned by TieringSourceReader this.connection = connection; @@ -442,9 +443,7 @@ private TableBucketWriteResult completeLakeWriter( WriteResult writeResult = null; if (lakeWriter != null) { writeResult = lakeWriter.complete(); - if (tieringMetrics != null) { - tieringMetrics.recordBytesWritten(lakeWriter.getBytesWritten()); - } + tieringMetrics.recordBytesWritten(lakeWriter.getBytesWritten()); lakeWriter.close(); } return toTableBucketWriteResult( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java index c5672b1584..f7cc93f3aa 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java @@ -34,12 +34,8 @@ *

The following metrics are available: * *

    - *
  • {@code fluss.tieringService.readBytes} - Counter: cumulative bytes read from Fluss since - * the job started. - *
  • {@code fluss.tieringService.readBytesPerSecond} - Meter: bytes-per-second rate derived from - * the counter using a 60-second sliding window. *
  • {@code fluss.tieringService.writtenBytes} - Counter: cumulative bytes written to the lake - * since the job started (physical file sizes after compression/encoding). + * since the job started (physical file sizes). *
  • {@code fluss.tieringService.writtenBytesPerSecond} - Meter: write bytes-per-second rate * derived from the counter using a 60-second sliding window. *
@@ -54,14 +50,14 @@ public class TieringMetrics { private final Counter writtenBytesCounter; public TieringMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) { - MetricGroup tieringJobGroup = + MetricGroup tieringServiceGroup = sourceReaderMetricGroup .addGroup(FLUSS_METRIC_GROUP) .addGroup(TIERING_SERVICE_GROUP); this.writtenBytesCounter = - tieringJobGroup.counter(MetricNames.TIERING_SERVICE_WRITTEN_BYTES); - tieringJobGroup.meter( + tieringServiceGroup.counter(MetricNames.TIERING_SERVICE_WRITTEN_BYTES); + tieringServiceGroup.meter( MetricNames.TIERING_SERVICE_WRITTEN_BYTES_RATE, new MeterView(writtenBytesCounter)); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java index fdf7a84771..447ada7a7a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java @@ -26,6 +26,7 @@ import org.apache.fluss.client.write.HashBucketAssigner; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; @@ -38,6 +39,8 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -277,7 +280,12 @@ void testTieringMixTables() throws Exception { } private TieringSplitReader createTieringReader(Connection connection) { - return new TieringSplitReader<>(connection, new TestingLakeTieringFactory()); + final TieringMetrics tieringMetrics = + new TieringMetrics( + InternalSourceReaderMetricGroup.mock( + new MetricListener().getMetricGroup())); + return new TieringSplitReader<>( + connection, new TestingLakeTieringFactory(), tieringMetrics); } private void verifyTieringRows( From 7bad18b275ae35aa0e00e76e85ca6f12a9ff104f Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Thu, 26 Mar 2026 14:56:27 +0800 Subject: [PATCH 3/5] add tiering service metrics documentation --- .../observability/monitor-metrics.md | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index ade4be9c61..a5c2598a08 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -1125,6 +1125,36 @@ How to Use Flink Metrics, you can see [Flink Metrics](https://nightlies.apache.o Table The output records per second. Meter - + + + + +### Tiering Service Metrics + +These metrics are exposed by the Flink-based tiering source reader when running the Lake Tiering Service. +All metrics are registered under the `fluss.tieringService` metric group, which is a child of the Flink `SourceReaderMetricGroup`. + + + + + + + + + + + + + + + + + + + + + + +
Metrics NameLevelDescriptionType
writtenBytesFlink Source OperatorThe cumulative bytes written to lake storage since the tiering job started. This measures the actual physical file sizes written to the lake (Iceberg, Paimon, etc.), not the logical record sizes. Returns -1 for lake writers that do not support byte tracking.Counter
writtenBytesPerSecondFlink Source OperatorThe write throughput rate in bytes per second, derived from the writtenBytes counter using a 60-second sliding window.Meter
\ No newline at end of file From 88d22cb397c6a392f054a6153a415d6923f22b5e Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Thu, 26 Mar 2026 15:08:13 +0800 Subject: [PATCH 4/5] verify getBytesWritten returns correct value after complete --- .../apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java | 2 ++ .../org/apache/fluss/lake/lance/tiering/LanceTieringTest.java | 2 ++ .../org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java | 2 ++ 3 files changed, 6 insertions(+) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index d5255005c7..697183e891 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -178,6 +178,8 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable writer.write(record); } IcebergWriteResult result = writer.complete(); + // Verify that getBytesWritten returns a positive value after complete + assertThat(writer.getBytesWritten()).isGreaterThan(0); byte[] serialized = writeResultSerializer.serialize(result); icebergWriteResults.add( writeResultSerializer.deserialize( diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java index 64dae36962..21d5c02178 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java @@ -159,6 +159,8 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception { } // serialize/deserialize writeResult LanceWriteResult lanceWriteResult = lakeWriter.complete(); + // Verify that getBytesWritten returns a positive value after complete + assertThat(lakeWriter.getBytesWritten()).isEqualTo(-1L); byte[] serialized = writeResultSerializer.serialize(lanceWriteResult); lanceWriteResults.add( writeResultSerializer.deserialize( diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java index edddc5b555..1a537ee468 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java @@ -957,6 +957,8 @@ private void writeData( } // serialize/deserialize writeResult PaimonWriteResult paimonWriteResult = lakeWriter.complete(); + // Verify that getBytesWritten returns a positive value after complete + assertThat(lakeWriter.getBytesWritten()).isGreaterThan(0); byte[] serialized = writeResultSerializer.serialize(paimonWriteResult); paimonWriteResults.add( writeResultSerializer.deserialize( From c6f35e896dd81530f9a644a6f12be8b446825a78 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Thu, 26 Mar 2026 16:07:27 +0800 Subject: [PATCH 5/5] exclude LakeWriter from test coverage --- fluss-test-coverage/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 399f64583e..e131686f89 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -445,6 +445,7 @@ org.apache.fluss.lake.lance.* org.apache.fluss.lake.iceberg.* + org.apache.fluss.lake.writer.LakeWriter org.apache.fluss.row.encode.iceberg.* org.apache.fluss.bucketing.IcebergBucketingFunction