-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29823 Control WAL flush and offset persistence from ReplicationSourceShipper #7617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,13 +21,16 @@ | |
| import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.hbase.Cell; | ||
| import org.apache.hadoop.hbase.CellUtil; | ||
| import org.apache.hadoop.hbase.HConstants; | ||
| import org.apache.hadoop.hbase.replication.ReplicationEndpoint; | ||
| import org.apache.hadoop.hbase.replication.ReplicationPeer; | ||
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; | ||
| import org.apache.hadoop.hbase.util.Threads; | ||
| import org.apache.hadoop.hbase.wal.WAL.Entry; | ||
|
|
@@ -74,6 +77,12 @@ public enum WorkerState { | |
| private final int DEFAULT_TIMEOUT = 20000; | ||
| private final int getEntriesTimeout; | ||
| private final int shipEditsTimeout; | ||
| private long accumulatedSizeSinceLastUpdate = 0L; | ||
| private long lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); | ||
| private final long offsetUpdateIntervalMs; | ||
| private final long offsetUpdateSizeThresholdBytes; | ||
| private WALEntryBatch lastShippedBatch; | ||
| private final List<Entry> entriesForCleanUpHFileRefs = new ArrayList<>(); | ||
|
|
||
| public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, | ||
| ReplicationSourceWALReader walReader) { | ||
|
|
@@ -90,6 +99,22 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, Replicati | |
| this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); | ||
| this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, | ||
| HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); | ||
| ReplicationPeer peer = source.getPeer(); | ||
| if (peer != null && peer.getPeerConfig() != null) { | ||
| Map<String, String> configMap = peer.getPeerConfig().getConfiguration(); | ||
| this.offsetUpdateIntervalMs = (configMap != null | ||
| && configMap.containsKey("hbase.replication.shipper.offset.update.interval.ms")) | ||
| ? Long.parseLong(configMap.get("hbase.replication.shipper.offset.update.interval.ms")) | ||
| : Long.MAX_VALUE; | ||
| this.offsetUpdateSizeThresholdBytes = (configMap != null | ||
| && configMap.containsKey("hbase.replication.shipper.offset.update.size.threshold")) | ||
| ? Long.parseLong(configMap.get("hbase.replication.shipper.offset.update.size.threshold")) | ||
| : -1L; | ||
| } else { | ||
| // If peer is not initialized | ||
| this.offsetUpdateIntervalMs = Long.MAX_VALUE; | ||
| this.offsetUpdateSizeThresholdBytes = -1; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -105,10 +130,17 @@ public final void run() { | |
| sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier); | ||
| continue; | ||
| } | ||
| // check time-based offset persistence | ||
| if (shouldPersistLogPosition()) { | ||
| // Trigger offset persistence via existing retry/backoff mechanism in shipEdits() | ||
| WALEntryBatch emptyBatch = createEmptyBatchForTimeBasedFlush(); | ||
| if (emptyBatch != null) shipEdits(emptyBatch); | ||
| } | ||
| try { | ||
| WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); | ||
| LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(), | ||
| entryBatch); | ||
|
|
||
| if (entryBatch == null) { | ||
| continue; | ||
| } | ||
|
|
@@ -133,6 +165,16 @@ public final void run() { | |
| } | ||
| } | ||
|
|
||
| private WALEntryBatch createEmptyBatchForTimeBasedFlush() { | ||
| // Reuse last shipped WAL position with 0 entries | ||
| if (lastShippedBatch == null) { | ||
| return null; | ||
| } | ||
| WALEntryBatch batch = new WALEntryBatch(0, lastShippedBatch.getLastWalPath()); | ||
| batch.setLastWalPosition(lastShippedBatch.getLastWalPosition()); | ||
| return batch; | ||
| } | ||
|
|
||
| private void noMoreData() { | ||
| if (source.isRecovered()) { | ||
| LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, | ||
|
|
@@ -154,15 +196,16 @@ protected void postFinish() { | |
| private void shipEdits(WALEntryBatch entryBatch) { | ||
| List<Entry> entries = entryBatch.getWalEntries(); | ||
| int sleepMultiplier = 0; | ||
| if (entries.isEmpty()) { | ||
| updateLogPosition(entryBatch); | ||
| return; | ||
| } | ||
| int currentSize = (int) entryBatch.getHeapSize(); | ||
| source.getSourceMetrics() | ||
| .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); | ||
| while (isActive()) { | ||
| try { | ||
| if (entries.isEmpty()) { | ||
| lastShippedBatch = entryBatch; | ||
| persistLogPosition(); | ||
| return; | ||
| } | ||
| try { | ||
| source.tryThrottle(currentSize); | ||
| } catch (InterruptedException e) { | ||
|
|
@@ -190,13 +233,13 @@ private void shipEdits(WALEntryBatch entryBatch) { | |
| } else { | ||
| sleepMultiplier = Math.max(sleepMultiplier - 1, 0); | ||
| } | ||
| // Clean up hfile references | ||
| for (Entry entry : entries) { | ||
| cleanUpHFileRefs(entry.getEdit()); | ||
| LOG.trace("shipped entry {}: ", entry); | ||
|
|
||
| accumulatedSizeSinceLastUpdate += currentSize; | ||
| entriesForCleanUpHFileRefs.addAll(entries); | ||
| lastShippedBatch = entryBatch; | ||
| if (shouldPersistLogPosition()) { | ||
| persistLogPosition(); | ||
| } | ||
| // Log and clean up WAL logs | ||
| updateLogPosition(entryBatch); | ||
|
|
||
| // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) | ||
| // this sizeExcludeBulkLoad has to use same calculation that when calling | ||
|
|
@@ -229,6 +272,41 @@ private void shipEdits(WALEntryBatch entryBatch) { | |
| } | ||
| } | ||
|
|
||
| private boolean shouldPersistLogPosition() { | ||
| if (accumulatedSizeSinceLastUpdate == 0 || lastShippedBatch == null) { | ||
| return false; | ||
| } | ||
|
|
||
| // Default behaviour to update offset immediately after replicate() | ||
| if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs == Long.MAX_VALUE) { | ||
| return true; | ||
| } | ||
|
|
||
| return (accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes) | ||
| || (EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >= offsetUpdateIntervalMs); | ||
| } | ||
|
Comment on lines
275
to
287
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be in the Endpoint, as the decision varies according to the type of endpoint. Today we have basically two types, buffered and non-buffered. If we have new endpoint types in the future, we might again need to come here and add the related logic.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see my comment above to get more context
|
||
|
|
||
| private void persistLogPosition() throws IOException { | ||
| if (lastShippedBatch == null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we could cumulate different batches in the above loop, a null batch does not mean we haven't shipped anything out? Why here we just return if lastShippedBatch is null?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I understand, lastShippedBatch 'null' means no batch has been replicated yet, so we don't need to update offset. Please correct me if I am wrong here lastShippedBatch is by default 'null' during ReplicationSourceShipper initialisation and as soon as a batch is replicated it is updated.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, OK, you do not rese the lastShippedBatch when reading a new batch. But it still makes me a bit nervous that how can we get here when lastShippedBatch is null... |
||
| return; | ||
| } | ||
|
|
||
| ReplicationEndpoint endpoint = source.getReplicationEndpoint(); | ||
| endpoint.beforePersistingReplicationOffset(); | ||
|
|
||
| // Clean up hfile references | ||
| for (Entry entry : entriesForCleanUpHFileRefs) { | ||
| cleanUpHFileRefs(entry.getEdit()); | ||
| } | ||
| entriesForCleanUpHFileRefs.clear(); | ||
|
|
||
| accumulatedSizeSinceLastUpdate = 0; | ||
| lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); | ||
|
|
||
| // Log and clean up WAL logs | ||
| updateLogPosition(lastShippedBatch); | ||
| } | ||
|
|
||
| private void cleanUpHFileRefs(WALEdit edit) throws IOException { | ||
| String peerId = source.getPeerId(); | ||
| if (peerId.contains("-")) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than having these
stagedWalSizeandlastShippedBatchas global variables, we should just pass theentryBatchalong toshouldPersistLogPosition()(which should be defined/implemented in the endpoints, btw, see my other comment related) andpersistLogPosition().There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to determine whether we need to persist the log position in shipper, based on some configurations, not triggered by replication endpoint. Users can choose different configuration values based on different replication endpoint implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, it doesn't look much cohesive. Shipper seems to be taking decisions based on specific endpoint implementations. What if new endpoint impls with different logic for updating log position are thought in the future, we would need to revisit the shipper again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think time based and size based persistency is enough for most cases? If in the future we have some special endpoint which needs new type of decision way, we can add new mechanism, no problem.
The problem here why we do not want to only trigger persistency from endpoint is that, we have other considerations about when to persist the log position, like the trade off between failover and pressure on replication storage. So here I suggest that we introduce general mechanisms to control the behavior of persistency of log position, users can tune it based on different approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, so your idea is to allow shipper to decide persist log position based on time and/or stg usage by wals regarldess of the endpoint implementation? That would be fine for me, but then we would need to adjust the shouldPersistLogPosition method accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you referring to your original comment about passing the entire
entryBatchobject?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I meant this check that was being done inside "shouldPersistLogPosition", which would cause the buffer size to be only considered for specific endpoint types.
@ankitsol has already addressed it on a recent commit.