From 640308bf96121abbabd3f9fcad9ba094699af23d Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Mon, 12 Jan 2026 08:55:19 +0530 Subject: [PATCH] HBASE-29823 Control WAL flush and offset persistence from ReplicationSourceShipper Fix UT Address review comments Addressed review comments Spotless fix Move configs to Endpoint peerConfig --- .../replication/ReplicationEndpoint.java | 7 ++ .../ReplicationSourceShipper.java | 98 +++++++++++++++++-- 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 5edd5b3e8c92..3407189fcf6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -283,4 +283,11 @@ public int getTimeout() { * @throws IllegalStateException if this service's state isn't FAILED. */ Throwable failureCause(); + + /** + * Hook invoked before persisting replication offsets. Eg: Buffered endpoints can flush/close WALs + * here. + */ + default void beforePersistingReplicationOffset() throws IOException { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index d05e4fed045b..acca889d09c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -21,13 +21,16 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -74,6 +77,12 @@ public enum WorkerState { private final int DEFAULT_TIMEOUT = 20000; private final int getEntriesTimeout; private final int shipEditsTimeout; + private long accumulatedSizeSinceLastUpdate = 0L; + private long lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); + private final long offsetUpdateIntervalMs; + private final long offsetUpdateSizeThresholdBytes; + private WALEntryBatch lastShippedBatch; + private final List entriesForCleanUpHFileRefs = new ArrayList<>(); public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, ReplicationSourceWALReader walReader) { @@ -90,6 +99,22 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, Replicati this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); + ReplicationPeer peer = source.getPeer(); + if (peer != null && peer.getPeerConfig() != null) { + Map configMap = peer.getPeerConfig().getConfiguration(); + this.offsetUpdateIntervalMs = (configMap != null + && configMap.containsKey("hbase.replication.shipper.offset.update.interval.ms")) + ? Long.parseLong(configMap.get("hbase.replication.shipper.offset.update.interval.ms")) + : Long.MAX_VALUE; + this.offsetUpdateSizeThresholdBytes = (configMap != null + && configMap.containsKey("hbase.replication.shipper.offset.update.size.threshold")) + ? Long.parseLong(configMap.get("hbase.replication.shipper.offset.update.size.threshold")) + : -1L; + } else { + // If peer is not initialized + this.offsetUpdateIntervalMs = Long.MAX_VALUE; + this.offsetUpdateSizeThresholdBytes = -1; + } } @Override @@ -105,10 +130,17 @@ public final void run() { sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier); continue; } + // check time-based offset persistence + if (shouldPersistLogPosition()) { + // Trigger offset persistence via existing retry/backoff mechanism in shipEdits() + WALEntryBatch emptyBatch = createEmptyBatchForTimeBasedFlush(); + if (emptyBatch != null) shipEdits(emptyBatch); + } try { WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(), entryBatch); + if (entryBatch == null) { continue; } @@ -133,6 +165,16 @@ public final void run() { } } + private WALEntryBatch createEmptyBatchForTimeBasedFlush() { + // Reuse last shipped WAL position with 0 entries + if (lastShippedBatch == null) { + return null; + } + WALEntryBatch batch = new WALEntryBatch(0, lastShippedBatch.getLastWalPath()); + batch.setLastWalPosition(lastShippedBatch.getLastWalPosition()); + return batch; + } + private void noMoreData() { if (source.isRecovered()) { LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, @@ -154,15 +196,16 @@ protected void postFinish() { private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; - if (entries.isEmpty()) { - updateLogPosition(entryBatch); - return; - } int currentSize = (int) entryBatch.getHeapSize(); source.getSourceMetrics() .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); while (isActive()) { try { + if (entries.isEmpty()) { + lastShippedBatch = entryBatch; + persistLogPosition(); + return; + } try { source.tryThrottle(currentSize); } catch (InterruptedException e) { @@ -190,13 +233,13 @@ private void shipEdits(WALEntryBatch entryBatch) { } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - // Clean up hfile references - for (Entry entry : entries) { - cleanUpHFileRefs(entry.getEdit()); - LOG.trace("shipped entry {}: ", entry); + + accumulatedSizeSinceLastUpdate += currentSize; + entriesForCleanUpHFileRefs.addAll(entries); + lastShippedBatch = entryBatch; + if (shouldPersistLogPosition()) { + persistLogPosition(); } - // Log and clean up WAL logs - updateLogPosition(entryBatch); // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) // this sizeExcludeBulkLoad has to use same calculation that when calling @@ -229,6 +272,41 @@ private void shipEdits(WALEntryBatch entryBatch) { } } + private boolean shouldPersistLogPosition() { + if (accumulatedSizeSinceLastUpdate == 0 || lastShippedBatch == null) { + return false; + } + + // Default behaviour to update offset immediately after replicate() + if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs == Long.MAX_VALUE) { + return true; + } + + return (accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes) + || (EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >= offsetUpdateIntervalMs); + } + + private void persistLogPosition() throws IOException { + if (lastShippedBatch == null) { + return; + } + + ReplicationEndpoint endpoint = source.getReplicationEndpoint(); + endpoint.beforePersistingReplicationOffset(); + + // Clean up hfile references + for (Entry entry : entriesForCleanUpHFileRefs) { + cleanUpHFileRefs(entry.getEdit()); + } + entriesForCleanUpHFileRefs.clear(); + + accumulatedSizeSinceLastUpdate = 0; + lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); + + // Log and clean up WAL logs + updateLogPosition(lastShippedBatch); + } + private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = source.getPeerId(); if (peerId.contains("-")) {