Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public LogFetchCollector(
* @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logFetchBuffer) {
public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
Map<TableBucket, List<ScanRecord>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;

Expand Down Expand Up @@ -143,7 +143,7 @@ public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logF
}
}

return fetched;
return new ScanRecords(fetched);
}

private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.metrics.ScannerMetricGroup;
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.cluster.BucketLocation;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
Expand Down Expand Up @@ -161,7 +160,7 @@ public boolean hasAvailableFetches() {
return !logFetchBuffer.isEmpty();
}

public Map<TableBucket, List<ScanRecord>> collectFetch() {
public ScanRecords collectFetch() {
return logFetchCollector.collectFetch(logFetchBuffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.metrics.ScannerMetricGroup;
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.WakeupException;
import org.apache.fluss.metadata.SchemaGetter;
Expand All @@ -41,8 +40,6 @@
import java.time.Duration;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -141,25 +138,25 @@ public ScanRecords poll(Duration timeout) {
long timeoutNanos = timeout.toNanos();
long startNanos = System.nanoTime();
do {
Map<TableBucket, List<ScanRecord>> fetchRecords = pollForFetches();
if (fetchRecords.isEmpty()) {
ScanRecords scanRecords = pollForFetches();
if (scanRecords.isEmpty()) {
try {
if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) {
// logFetcher waits for the timeout and no data in buffer,
// so we return empty
return new ScanRecords(fetchRecords);
return scanRecords;
}
} catch (WakeupException e) {
// wakeup() is called, we need to return empty
return new ScanRecords(fetchRecords);
return scanRecords;
}
} else {
// before returning the fetched records, we can send off the next round of
// fetches and avoid block waiting for their responses to enable pipelining
// while the user is handling the fetched records.
logFetcher.sendFetches();

return new ScanRecords(fetchRecords);
return scanRecords;
}
} while (System.nanoTime() - startNanos < timeoutNanos);

Expand Down Expand Up @@ -247,10 +244,10 @@ public void wakeup() {
logFetcher.wakeup();
}

private Map<TableBucket, List<ScanRecord>> pollForFetches() {
Map<TableBucket, List<ScanRecord>> fetchedRecords = logFetcher.collectFetch();
if (!fetchedRecords.isEmpty()) {
return fetchedRecords;
private ScanRecords pollForFetches() {
ScanRecords scanRecords = logFetcher.collectFetch();
if (!scanRecords.isEmpty()) {
return scanRecords;
}

// send any new fetches (won't resend pending fetches).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.metadata.TestingMetadataUpdater;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecordReadContext;
Expand All @@ -31,7 +30,6 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.fluss.record.TestData.DATA1;
Expand Down Expand Up @@ -97,10 +95,9 @@ void testNormal() throws Exception {
assertThat(completedFetch.isInitialized()).isFalse();

// Fetch the data and validate that we get all the records we want back.
Map<TableBucket, List<ScanRecord>> bucketAndRecords =
logFetchCollector.collectFetch(logFetchBuffer);
assertThat(bucketAndRecords.size()).isEqualTo(1);
assertThat(bucketAndRecords.get(tb)).size().isEqualTo(10);
ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
assertThat(bucketAndRecords.buckets().size()).isEqualTo(1);
assertThat(bucketAndRecords.records(tb).size()).isEqualTo(10);

// When we collected the data from the buffer, this will cause the completed fetch to get
// initialized.
Expand All @@ -122,7 +119,7 @@ void testNormal() throws Exception {

// Now attempt to collect more records from the fetch buffer.
bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
assertThat(bucketAndRecords.size()).isEqualTo(0);
assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
}

@Test
Expand All @@ -147,14 +144,13 @@ void testCollectAfterUnassign() throws Exception {
// unassign bucket 2
logScannerStatus.unassignScanBuckets(Collections.singletonList(tb2));

Map<TableBucket, List<ScanRecord>> bucketAndRecords =
logFetchCollector.collectFetch(logFetchBuffer);
ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
// should only contain records for bucket 1
assertThat(bucketAndRecords.keySet()).containsExactly(tb1);
assertThat(bucketAndRecords.buckets()).containsExactly(tb1);

// collect again, should be empty
bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
assertThat(bucketAndRecords.size()).isEqualTo(0);
assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
}

private DefaultCompletedFetch makeCompletedFetch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(logFetcher.hasAvailableFetches()).isTrue();
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
assertThat(records.size()).isEqualTo(1);
List<ScanRecord> scanRecords = records.get(tb0);
ScanRecords records = logFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(1);
List<ScanRecord> scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
.isEqualTo(expectedRows);

Expand Down Expand Up @@ -193,9 +193,9 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
records = newSchemaLogFetcher.collectFetch();
assertThat(records.size()).isEqualTo(1);
assertThat(records.get(tb0)).hasSize(20);
scanRecords = records.get(tb0);
assertThat(records.buckets().size()).isEqualTo(1);
assertThat(records.records(tb0)).hasSize(20);
scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
.isEqualTo(expectedRows);
newSchemaLogFetcher.close();
Expand Down Expand Up @@ -226,10 +226,10 @@ void testFetch() throws Exception {
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
});

Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
assertThat(records.size()).isEqualTo(2);
assertThat(records.get(tb0).size()).isEqualTo(10);
assertThat(records.get(tb1).size()).isEqualTo(10);
ScanRecords records = logFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(2);
assertThat(records.records(tb0).size()).isEqualTo(10);
assertThat(records.records(tb1).size()).isEqualTo(10);

// after collect fetch, the fetcher is empty.
assertThat(logFetcher.hasAvailableFetches()).isFalse();
Expand Down Expand Up @@ -297,9 +297,9 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
assertThat(logFetcher.hasAvailableFetches()).isTrue();
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1);
});
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
assertThat(records.size()).isEqualTo(1);
assertThat(records.get(tb0).size()).isEqualTo(10);
ScanRecords records = logFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(1);
assertThat(records.records(tb0).size()).isEqualTo(10);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,13 @@ public interface LakeWriter<WriteResult> extends Closeable {
* @throws IOException if an I/O error occurs
*/
WriteResult complete() throws IOException;

/**
* Returns the total bytes written to the lake storage in this write session. Computed from
* actual physical file sizes after {@link #complete()} is called. Returns -1 by default for
* backward compatibility with third-party implementations.
*/
default long getBytesWritten() {
return -1L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,12 @@ public class MetricNames {
public static final String NETTY_NUM_ALLOCATIONS_PER_SECONDS = "numAllocationsPerSecond";
public static final String NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS =
"numHugeAllocationsPerSecond";

// --------------------------------------------------------------------------------------------
// metrics for tiering service
// --------------------------------------------------------------------------------------------

// for lake tiering metrics - operator level
public static final String TIERING_SERVICE_WRITTEN_BYTES = "writtenBytes";
public static final String TIERING_SERVICE_WRITTEN_BYTES_RATE = "writtenBytesPerSecond";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.client.Connection;
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
import org.apache.fluss.lake.writer.LakeTieringFactory;
Expand Down Expand Up @@ -73,17 +74,31 @@ public TieringSourceReader(
Duration pollTimeout) {
super(
elementsQueue,
new TieringSourceFetcherManager<>(
elementsQueue,
() -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout),
context.getConfiguration(),
(ignore) -> {}),
createFetcherManager(
elementsQueue, context, connection, lakeTieringFactory, pollTimeout),
new TableBucketWriteResultEmitter<>(),
context.getConfiguration(),
context);
this.connection = connection;
}

private static <WriteResult> TieringSourceFetcherManager<WriteResult> createFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
elementsQueue,
SourceReaderContext context,
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
Duration pollTimeout) {
TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup());
return new TieringSourceFetcherManager<>(
elementsQueue,
() ->
new TieringSplitReader<>(
connection, lakeTieringFactory, pollTimeout, tieringMetrics),
context.getConfiguration(),
(ignore) -> {});
}

@Override
public void start() {
// we request a split only if we did not get splits during the checkpoint restore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.fluss.client.table.scanner.log.ScanRecords;
import org.apache.fluss.flink.source.reader.BoundedSplitReader;
import org.apache.fluss.flink.source.reader.RecordAndPos;
import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics;
import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
Expand Down Expand Up @@ -106,16 +107,21 @@ public class TieringSplitReader<WriteResult>

private final Set<TieringSplit> currentEmptySplits;

private final TieringMetrics tieringMetrics;

public TieringSplitReader(
Connection connection, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
TieringMetrics tieringMetrics) {
this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT, tieringMetrics);
}

@VisibleForTesting
protected TieringSplitReader(
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
Duration pollTimeout) {
Duration pollTimeout,
TieringMetrics tieringMetrics) {
this.lakeTieringFactory = lakeTieringFactory;
// owned by TieringSourceReader
this.connection = connection;
Expand All @@ -129,6 +135,7 @@ protected TieringSplitReader(
this.currentPendingSnapshotSplits = new ArrayDeque<>();
this.reachTieringMaxDurationTables = new HashSet<>();
this.pollTimeout = pollTimeout;
this.tieringMetrics = tieringMetrics;
}

@Override
Expand Down Expand Up @@ -436,6 +443,7 @@ private TableBucketWriteResult<WriteResult> completeLakeWriter(
WriteResult writeResult = null;
if (lakeWriter != null) {
writeResult = lakeWriter.complete();
tieringMetrics.recordBytesWritten(lakeWriter.getBytesWritten());
lakeWriter.close();
}
return toTableBucketWriteResult(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.tiering.source.metrics;

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metrics.MetricNames;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;

/**
* A collection class for handling metrics in Tiering source reader.
*
* <p>All metrics are registered under group "fluss.tieringService", which is a child group of
* {@link org.apache.flink.metrics.groups.OperatorMetricGroup}.
*
* <p>The following metrics are available:
*
* <ul>
* <li>{@code fluss.tieringService.writtenBytes} - Counter: cumulative bytes written to the lake
* since the job started (physical file sizes).
* <li>{@code fluss.tieringService.writtenBytesPerSecond} - Meter: write bytes-per-second rate
* derived from the counter using a 60-second sliding window.
* </ul>
*/
@Internal
public class TieringMetrics {

// Metric group names
public static final String FLUSS_METRIC_GROUP = "fluss";
public static final String TIERING_SERVICE_GROUP = "tieringService";

private final Counter writtenBytesCounter;

public TieringMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
MetricGroup tieringServiceGroup =
sourceReaderMetricGroup
.addGroup(FLUSS_METRIC_GROUP)
.addGroup(TIERING_SERVICE_GROUP);

this.writtenBytesCounter =
tieringServiceGroup.counter(MetricNames.TIERING_SERVICE_WRITTEN_BYTES);
tieringServiceGroup.meter(
MetricNames.TIERING_SERVICE_WRITTEN_BYTES_RATE, new MeterView(writtenBytesCounter));
}

/** Records bytes written to the lake. Called once per bucket completion. */
public void recordBytesWritten(long bytes) {
writtenBytesCounter.inc(bytes);
}
}
Loading