diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java index 34a3ec86e6..e690eed8a1 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java @@ -83,7 +83,7 @@ public LogFetchCollector( * @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE */ - public Map> collectFetch(final LogFetchBuffer logFetchBuffer) { + public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { Map> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; @@ -143,7 +143,7 @@ public Map> collectFetch(final LogFetchBuffer logF } } - return fetched; + return new ScanRecords(fetched); } private List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index b5a03a1caf..c8f87984b0 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -22,7 +22,6 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.table.scanner.RemoteFileDownloader; -import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.cluster.BucketLocation; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -161,7 +160,7 @@ public boolean hasAvailableFetches() { return !logFetchBuffer.isEmpty(); } - public Map> collectFetch() { + public ScanRecords collectFetch() { return logFetchCollector.collectFetch(logFetchBuffer); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index 33181bd7ae..9c24dbeb77 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -21,7 +21,6 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.table.scanner.RemoteFileDownloader; -import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.WakeupException; import org.apache.fluss.metadata.SchemaGetter; @@ -41,8 +40,6 @@ import java.time.Duration; import java.util.Collections; import java.util.ConcurrentModificationException; -import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -141,17 +138,17 @@ public ScanRecords poll(Duration timeout) { long timeoutNanos = timeout.toNanos(); long startNanos = System.nanoTime(); do { - Map> fetchRecords = pollForFetches(); - if (fetchRecords.isEmpty()) { + ScanRecords scanRecords = pollForFetches(); + if (scanRecords.isEmpty()) { try { if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) { // logFetcher waits for the timeout and no data in buffer, // so we return empty - return new ScanRecords(fetchRecords); + return scanRecords; } } catch (WakeupException e) { // wakeup() is called, we need to return empty - return new ScanRecords(fetchRecords); + return scanRecords; } } else { // before returning the fetched records, we can send off the next round of @@ -159,7 +156,7 @@ public ScanRecords poll(Duration timeout) { // while the user is handling the fetched records. logFetcher.sendFetches(); - return new ScanRecords(fetchRecords); + return scanRecords; } } while (System.nanoTime() - startNanos < timeoutNanos); @@ -247,10 +244,10 @@ public void wakeup() { logFetcher.wakeup(); } - private Map> pollForFetches() { - Map> fetchedRecords = logFetcher.collectFetch(); - if (!fetchedRecords.isEmpty()) { - return fetchedRecords; + private ScanRecords pollForFetches() { + ScanRecords scanRecords = logFetcher.collectFetch(); + if (!scanRecords.isEmpty()) { + return scanRecords; } // send any new fetches (won't resend pending fetches). diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java index 99a108e2f3..471da0747a 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java @@ -19,7 +19,6 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metadata.TestingMetadataUpdater; -import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.config.Configuration; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.record.LogRecordReadContext; @@ -31,7 +30,6 @@ import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.apache.fluss.record.TestData.DATA1; @@ -97,10 +95,9 @@ void testNormal() throws Exception { assertThat(completedFetch.isInitialized()).isFalse(); // Fetch the data and validate that we get all the records we want back. - Map> bucketAndRecords = - logFetchCollector.collectFetch(logFetchBuffer); - assertThat(bucketAndRecords.size()).isEqualTo(1); - assertThat(bucketAndRecords.get(tb)).size().isEqualTo(10); + ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); + assertThat(bucketAndRecords.buckets().size()).isEqualTo(1); + assertThat(bucketAndRecords.records(tb).size()).isEqualTo(10); // When we collected the data from the buffer, this will cause the completed fetch to get // initialized. @@ -122,7 +119,7 @@ void testNormal() throws Exception { // Now attempt to collect more records from the fetch buffer. bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); - assertThat(bucketAndRecords.size()).isEqualTo(0); + assertThat(bucketAndRecords.buckets().size()).isEqualTo(0); } @Test @@ -147,14 +144,13 @@ void testCollectAfterUnassign() throws Exception { // unassign bucket 2 logScannerStatus.unassignScanBuckets(Collections.singletonList(tb2)); - Map> bucketAndRecords = - logFetchCollector.collectFetch(logFetchBuffer); + ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); // should only contain records for bucket 1 - assertThat(bucketAndRecords.keySet()).containsExactly(tb1); + assertThat(bucketAndRecords.buckets()).containsExactly(tb1); // collect again, should be empty bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); - assertThat(bucketAndRecords.size()).isEqualTo(0); + assertThat(bucketAndRecords.buckets().size()).isEqualTo(0); } private DefaultCompletedFetch makeCompletedFetch( diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java index 73c9b40fb6..dca64fb542 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java @@ -147,9 +147,9 @@ void testFetchWithSchemaChange() throws Exception { assertThat(logFetcher.hasAvailableFetches()).isTrue(); assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2); }); - Map> records = logFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(1); - List scanRecords = records.get(tb0); + ScanRecords records = logFetcher.collectFetch(); + assertThat(records.buckets().size()).isEqualTo(1); + List scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) .isEqualTo(expectedRows); @@ -193,9 +193,9 @@ void testFetchWithSchemaChange() throws Exception { assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2); }); records = newSchemaLogFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(1); - assertThat(records.get(tb0)).hasSize(20); - scanRecords = records.get(tb0); + assertThat(records.buckets().size()).isEqualTo(1); + assertThat(records.records(tb0)).hasSize(20); + scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) .isEqualTo(expectedRows); newSchemaLogFetcher.close(); @@ -226,10 +226,10 @@ void testFetch() throws Exception { assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2); }); - Map> records = logFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(2); - assertThat(records.get(tb0).size()).isEqualTo(10); - assertThat(records.get(tb1).size()).isEqualTo(10); + ScanRecords records = logFetcher.collectFetch(); + assertThat(records.buckets().size()).isEqualTo(2); + assertThat(records.records(tb0).size()).isEqualTo(10); + assertThat(records.records(tb1).size()).isEqualTo(10); // after collect fetch, the fetcher is empty. assertThat(logFetcher.hasAvailableFetches()).isFalse(); @@ -297,9 +297,9 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception { assertThat(logFetcher.hasAvailableFetches()).isTrue(); assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1); }); - Map> records = logFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(1); - assertThat(records.get(tb0).size()).isEqualTo(10); + ScanRecords records = logFetcher.collectFetch(); + assertThat(records.buckets().size()).isEqualTo(1); + assertThat(records.records(tb0).size()).isEqualTo(10); } @Test diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java index c3ab3a352a..972949cb7b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java @@ -47,4 +47,13 @@ public interface LakeWriter extends Closeable { * @throws IOException if an I/O error occurs */ WriteResult complete() throws IOException; + + /** + * Returns the total bytes written to the lake storage in this write session. Computed from + * actual physical file sizes after {@link #complete()} is called. Returns -1 by default for + * backward compatibility with third-party implementations. + */ + default long getBytesWritten() { + return -1L; + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index b11c3b92fc..e5baff6d70 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -280,4 +280,12 @@ public class MetricNames { public static final String NETTY_NUM_ALLOCATIONS_PER_SECONDS = "numAllocationsPerSecond"; public static final String NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS = "numHugeAllocationsPerSecond"; + + // -------------------------------------------------------------------------------------------- + // metrics for tiering service + // -------------------------------------------------------------------------------------------- + + // for lake tiering metrics - operator level + public static final String TIERING_SERVICE_WRITTEN_BYTES = "writtenBytes"; + public static final String TIERING_SERVICE_WRITTEN_BYTES_RATE = "writtenBytesPerSecond"; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index eae584efed..6f0fc43b95 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; @@ -73,17 +74,31 @@ public TieringSourceReader( Duration pollTimeout) { super( elementsQueue, - new TieringSourceFetcherManager<>( - elementsQueue, - () -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout), - context.getConfiguration(), - (ignore) -> {}), + createFetcherManager( + elementsQueue, context, connection, lakeTieringFactory, pollTimeout), new TableBucketWriteResultEmitter<>(), context.getConfiguration(), context); this.connection = connection; } + private static TieringSourceFetcherManager createFetcherManager( + FutureCompletingBlockingQueue>> + elementsQueue, + SourceReaderContext context, + Connection connection, + LakeTieringFactory lakeTieringFactory, + Duration pollTimeout) { + TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup()); + return new TieringSourceFetcherManager<>( + elementsQueue, + () -> + new TieringSplitReader<>( + connection, lakeTieringFactory, pollTimeout, tieringMetrics), + context.getConfiguration(), + (ignore) -> {}); + } + @Override public void start() { // we request a split only if we did not get splits during the checkpoint restore diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 1e530d69a3..68ef3e137e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -25,6 +25,7 @@ import org.apache.fluss.client.table.scanner.log.ScanRecords; import org.apache.fluss.flink.source.reader.BoundedSplitReader; import org.apache.fluss.flink.source.reader.RecordAndPos; +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; @@ -106,16 +107,21 @@ public class TieringSplitReader private final Set currentEmptySplits; + private final TieringMetrics tieringMetrics; + public TieringSplitReader( - Connection connection, LakeTieringFactory lakeTieringFactory) { - this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT); + Connection connection, + LakeTieringFactory lakeTieringFactory, + TieringMetrics tieringMetrics) { + this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT, tieringMetrics); } @VisibleForTesting protected TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory, - Duration pollTimeout) { + Duration pollTimeout, + TieringMetrics tieringMetrics) { this.lakeTieringFactory = lakeTieringFactory; // owned by TieringSourceReader this.connection = connection; @@ -129,6 +135,7 @@ protected TieringSplitReader( this.currentPendingSnapshotSplits = new ArrayDeque<>(); this.reachTieringMaxDurationTables = new HashSet<>(); this.pollTimeout = pollTimeout; + this.tieringMetrics = tieringMetrics; } @Override @@ -436,6 +443,7 @@ private TableBucketWriteResult completeLakeWriter( WriteResult writeResult = null; if (lakeWriter != null) { writeResult = lakeWriter.complete(); + tieringMetrics.recordBytesWritten(lakeWriter.getBytesWritten()); lakeWriter.close(); } return toTableBucketWriteResult( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java new file mode 100644 index 0000000000..f7cc93f3aa --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source.metrics; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metrics.MetricNames; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; + +/** + * A collection class for handling metrics in Tiering source reader. + * + *

All metrics are registered under group "fluss.tieringService", which is a child group of + * {@link org.apache.flink.metrics.groups.OperatorMetricGroup}. + * + *

The following metrics are available: + * + *

    + *
  • {@code fluss.tieringService.writtenBytes} - Counter: cumulative bytes written to the lake + * since the job started (physical file sizes). + *
  • {@code fluss.tieringService.writtenBytesPerSecond} - Meter: write bytes-per-second rate + * derived from the counter using a 60-second sliding window. + *
+ */ +@Internal +public class TieringMetrics { + + // Metric group names + public static final String FLUSS_METRIC_GROUP = "fluss"; + public static final String TIERING_SERVICE_GROUP = "tieringService"; + + private final Counter writtenBytesCounter; + + public TieringMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) { + MetricGroup tieringServiceGroup = + sourceReaderMetricGroup + .addGroup(FLUSS_METRIC_GROUP) + .addGroup(TIERING_SERVICE_GROUP); + + this.writtenBytesCounter = + tieringServiceGroup.counter(MetricNames.TIERING_SERVICE_WRITTEN_BYTES); + tieringServiceGroup.meter( + MetricNames.TIERING_SERVICE_WRITTEN_BYTES_RATE, new MeterView(writtenBytesCounter)); + } + + /** Records bytes written to the lake. Called once per bucket completion. */ + public void recordBytesWritten(long bytes) { + writtenBytesCounter.inc(bytes); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java index fdf7a84771..447ada7a7a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java @@ -26,6 +26,7 @@ import org.apache.fluss.client.write.HashBucketAssigner; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; @@ -38,6 +39,8 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -277,7 +280,12 @@ void testTieringMixTables() throws Exception { } private TieringSplitReader createTieringReader(Connection connection) { - return new TieringSplitReader<>(connection, new TestingLakeTieringFactory()); + final TieringMetrics tieringMetrics = + new TieringMetrics( + InternalSourceReaderMetricGroup.mock( + new MetricListener().getMetricGroup())); + return new TieringSplitReader<>( + connection, new TestingLakeTieringFactory(), tieringMetrics); } private void verifyTieringRows( diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java index 227112eae8..32c678ed05 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java @@ -28,6 +28,7 @@ import org.apache.fluss.record.LogRecord; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; +import org.apache.iceberg.DataFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; @@ -43,6 +44,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -59,6 +61,7 @@ public class IcebergLakeWriter implements LakeWriter { private final Catalog icebergCatalog; private final Table icebergTable; private final RecordWriter recordWriter; + private long bytesWritten = 0L; @Nullable private final ExecutorService compactionExecutor; @Nullable private CompletableFuture compactionFuture; @@ -111,6 +114,10 @@ public void write(LogRecord record) throws IOException { public IcebergWriteResult complete() throws IOException { try { WriteResult writeResult = recordWriter.complete(); + bytesWritten = + Arrays.stream(writeResult.dataFiles()) + .mapToLong(DataFile::fileSizeInBytes) + .sum(); RewriteDataFileResult rewriteDataFileResult = null; if (compactionFuture != null) { @@ -122,6 +129,11 @@ public IcebergWriteResult complete() throws IOException { } } + @Override + public long getBytesWritten() { + return bytesWritten; + } + @Override public void close() throws IOException { try { diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index d5255005c7..697183e891 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -178,6 +178,8 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable writer.write(record); } IcebergWriteResult result = writer.complete(); + // Verify that getBytesWritten returns a positive value after complete + assertThat(writer.getBytesWritten()).isGreaterThan(0); byte[] serialized = writeResultSerializer.serialize(result); icebergWriteResults.add( writeResultSerializer.deserialize( diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java index 64dae36962..21d5c02178 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java @@ -159,6 +159,8 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception { } // serialize/deserialize writeResult LanceWriteResult lanceWriteResult = lakeWriter.complete(); + // Verify that getBytesWritten returns a positive value after complete + assertThat(lakeWriter.getBytesWritten()).isEqualTo(-1L); byte[] serialized = writeResultSerializer.serialize(lanceWriteResult); lanceWriteResults.add( writeResultSerializer.deserialize( diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 8472b825b0..2b23991a94 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -26,8 +26,10 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import java.io.IOException; import java.util.Collections; @@ -41,6 +43,7 @@ public class PaimonLakeWriter implements LakeWriter { private final Catalog paimonCatalog; private final RecordWriter recordWriter; + private long bytesWritten = 0L; public PaimonLakeWriter( PaimonCatalogProvider paimonCatalogProvider, WriterInitContext writerInitContext) @@ -84,9 +87,21 @@ public PaimonWriteResult complete() throws IOException { } catch (Exception e) { throw new IOException("Failed to complete Paimon write.", e); } + if (commitMessage instanceof CommitMessageImpl) { + bytesWritten = + ((CommitMessageImpl) commitMessage) + .newFilesIncrement().newFiles().stream() + .mapToLong(DataFileMeta::fileSize) + .sum(); + } return new PaimonWriteResult(commitMessage); } + @Override + public long getBytesWritten() { + return bytesWritten; + } + @Override public void close() throws IOException { try { diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java index edddc5b555..1a537ee468 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java @@ -957,6 +957,8 @@ private void writeData( } // serialize/deserialize writeResult PaimonWriteResult paimonWriteResult = lakeWriter.complete(); + // Verify that getBytesWritten returns a positive value after complete + assertThat(lakeWriter.getBytesWritten()).isGreaterThan(0); byte[] serialized = writeResultSerializer.serialize(paimonWriteResult); paimonWriteResults.add( writeResultSerializer.deserialize( diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 399f64583e..e131686f89 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -445,6 +445,7 @@ org.apache.fluss.lake.lance.* org.apache.fluss.lake.iceberg.* + org.apache.fluss.lake.writer.LakeWriter org.apache.fluss.row.encode.iceberg.* org.apache.fluss.bucketing.IcebergBucketingFunction diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index ade4be9c61..a5c2598a08 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -1125,6 +1125,36 @@ How to Use Flink Metrics, you can see [Flink Metrics](https://nightlies.apache.o Table The output records per second. Meter - + + + + +### Tiering Service Metrics + +These metrics are exposed by the Flink-based tiering source reader when running the Lake Tiering Service. +All metrics are registered under the `fluss.tieringService` metric group, which is a child of the Flink `SourceReaderMetricGroup`. + + + + + + + + + + + + + + + + + + + + + + +
Metrics NameLevelDescriptionType
writtenBytesFlink Source OperatorThe cumulative bytes written to lake storage since the tiering job started. This measures the actual physical file sizes written to the lake (Iceberg, Paimon, etc.), not the logical record sizes. Returns -1 for lake writers that do not support byte tracking.Counter
writtenBytesPerSecondFlink Source OperatorThe write throughput rate in bytes per second, derived from the writtenBytes counter using a 60-second sliding window.Meter
\ No newline at end of file