[ISSUE #10489] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes#10519
[ISSUE #10489] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes#10519qianye1001 wants to merge 10 commits into
Conversation
…rk round-trips and RocksDB flushes
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #10519 +/- ##
=============================================
+ Coverage 48.12% 48.23% +0.10%
- Complexity 13352 13535 +183
=============================================
Files 1377 1383 +6
Lines 100720 101531 +811
Branches 13012 13111 +99
=============================================
+ Hits 48476 48976 +500
- Misses 46309 46553 +244
- Partials 5935 6002 +67 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
This PR adds batch support for ChangeInvisibleTime across the full stack (remoting protocol → client → broker → proxy), reducing network round-trips and RocksDB flushes when multiple pop consumer records need invisible-time updates in the same batch. The feature is gated by enableBatchChangeInvisibleTime and falls back to per-entry processing when disabled.
Findings
-
[Critical]
PopConsumerCache.writeAndDeleteRecords— Busy-wait spin lock:while (!consumerLockService.tryLock(...)) { Thread.yield(); }is a CPU-consuming spin loop. Under high contention (many concurrent batch operations on the same group/topic), this can waste significant CPU. Consider using a timed lock (e.g.,tryLockwith a timeout + retry with backoff) or a blocking lock abstraction. -
[Critical]
PopConsumerCache.writeAndDeleteRecords— Lock topic derived from first record only: The lock topic is parsed fromlockRecordwhich comes fromvalidateAndGetLockRecord(writeRecordList, deleteRecordList). If the batch somehow contains records spanning different topics (despite the header-level validation), this could lead to incorrect locking. Please add an assertion or defensive check that all records in the batch share the same topic+group. -
[Warning]
ChangeInvisibleTimeProcessor.processBatchRequest— Batch size splitting: The proxy splits oversized batches (splitOversizedBrokerGroup), but the broker-sideprocessBatchRequestalso checkspopBatchChangeInvisibleTimeMaxSize. If the proxy split threshold and broker max size are misconfigured, the broker will reject the batch and all entries fail. Consider documenting the expected relationship between these two config values, or having the broker auto-split instead of rejecting. -
[Warning]
DefaultReceiptHandleManager.scheduleRenewTask— No max batch size for renew: The renew path collects all expiring handles in a group into a single batch call. If a group has thousands of handles expiring simultaneously (e.g., after a long consumer pause), this could create a very large batch. Consider capping the batch size and splitting into multiple calls. -
[Warning]
ConsumerProcessor.filterPopResult— TO_RETURN batch not flushed on error path: ThetoReturnMessageListis accumulated during the message loop and sent after the loop viabatchChangeInvisibleTime. If an exception occurs during the loop (e.g., in theREJECTcase), the accumulated TO_RETURN handles may not be processed. Consider wrapping the post-loop batch send in afinallyblock or ensuring it runs regardless of earlier exceptions. -
[Warning]
PopConsumerCache.writeAndDeleteRecords— Buffer eviction race: When records are in the in-memory buffer and awriteAndDeleteRecordsis called, the method splits records into buffer vs. store paths. The buffer deletion path callsbuffer.remove(record)while the store path adds tostoreDeleteRecords. If a concurrentscanExpiredRecordspromotes a record from buffer to store between the split and the actual store delete, the record could be missed. The lock should prevent this, but please verify the lock scope covers the full split+write+delete sequence. -
[Info]
PopConsumerRocksdbStore.writeAndDeleteRecords— Delete-before-put ordering: TheWriteBatchprocesses all deletes before puts. This is correct for the case where the same key appears in both lists (old record deleted, new record written with different key due to changed popTime). The ordering is sound. -
[Info] Good defensive handling of null entries in
prepareBatchRequestEntriesandprocessBatchChangeInvisibleTimeResponse. Each null entry gets a failure response without aborting the entire batch. -
[Info] Test coverage is comprehensive — covers cache, RocksDB store, service layer, processor, client API, and proxy components. The split-by-broker and oversized-batch tests in the proxy are particularly valuable.
Suggestions
- Replace the
Thread.yield()spin loop with a timed wait orCompletableFuture-based lock to avoid CPU waste under contention. - Add a defensive assertion that all records in a batch share the same topic+group before acquiring the lock.
- Consider aligning or documenting the relationship between proxy
batchChangeInvisibleTimeMaxSizeand brokerpopBatchChangeInvisibleTimeMaxSize. - Add a max batch size cap in the renew path to handle edge cases with large numbers of expiring handles.
Cross-repo Note
This PR adds new request/response protocol types (BatchChangeInvisibleTimeRequestHeader, BatchChangeInvisibleTimeRequestBody, etc.) in rocketmq-remoting. The rocketmq-clients repos do not need changes since the batch protocol is internal (broker ↔ proxy), not client-facing.
Automated review by github-manager-bot
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (re-review)
Summary
Re-review after 2 new commits:
5df26c4Fix single change invisible time KV write batch76cc6b4Serialize oversized batch change invisible time chunks
Changes Since Last Review
Commit 1 — Single-entry path fix: The changeInvisibilityDuration method now uses writeAndDeleteRecords for both the single-entry and batch paths, ensuring a single batched KV operation instead of separate write + delete calls. This is a clean improvement.
Commit 2 — Oversized batch chunking: The proxy now splits large handleMessageList into chunks of batchChangeInvisibleTimeMaxNum and processes them sequentially via thenCompose. This avoids overwhelming a single broker with many concurrent write-heavy batches. The fallback to per-item requests on failure is preserved.
Updated Findings
- [Resolved] The single-entry path now correctly uses the batch API — good fix.
- [Resolved] Oversized batches are now chunked and serialized — addresses the concern about large batch pressure.
- [Info]
PopConsumerCache.writeAndDeleteRecords— The spin-lock pattern (while (!tryLock) { Thread.yield(); }) is consistent with the existingdeleteRecordsmethod in the same class. While not ideal under high contention, this follows the established codebase convention. Acceptable for now; consider a timed backoff in a future cleanup. - [Positive] The
validateAndGetLockRecordmethod correctly ensures all records share the same lock key, preventing cross-topic/group lock conflicts. - [Positive] Good test coverage for the new
writeAndDeleteRecordsmethod — tests cover empty deletes, buffered records, topic lock routing, and mixed lock key rejection.
Verdict
The two new commits meaningfully address the previous review findings. The implementation looks solid.
Automated re-review by github-manager-bot
| public static final int BATCH_ACK_MESSAGE = 200151; | ||
| public static final int PEEK_MESSAGE = 200052; | ||
| public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053; | ||
| public static final int BATCH_CHANGE_MESSAGE_INVISIBLETIME = 200153; |
There was a problem hiding this comment.
和 batch ack 一样的逻辑,就用这个吧
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after 4 new commits)
Summary
Four new commits since the previous review:
- Refine batch entries — Introduces
ChangeInvisibleTimeRequestEntryas a first-class remoting type with proper serialization - Remove redundant validation — Removes the batch-level topic/group validation in
PopConsumerCache(now enforced at the header level) - Simplify write/delete — Replaces the busy-wait spin lock with a clean split-buffer/store approach
- Fix serializable compat — Skips
transientfields in the reflection-based compatibility test
Previous Findings — Status
-
[Critical] Busy-wait spin lock in— Resolved. The spin lock is removed. The new implementation splits records into buffer vs. store paths and processes them independently. Much cleaner.writeAndDeleteRecords -
[Critical] Lock topic derived from first record only— Resolved. The lock mechanism is removed entirely. The batch header enforces topic+group consistency, and the split logic processes each record independently. -
[Warning] Batch size splitting — Partially addressed. The broker now checks
batchMaxNumand rejects oversized batches. The proxy splits oversized groups viasplitOversizedBrokerGroup. However, see remaining concern below. -
[Warning] TO_RETURN batch not flushed on error — Likely resolved. The loop catches per-message exceptions and continues, so the post-loop batch send should execute. But consider wrapping the post-loop
batchChangeInvisibleTimecall in afinallyblock for extra safety. -
[Warning] Buffer eviction race — Resolved. The new
writeAndDeleteRecordscleanly separates buffer and store paths, avoiding the race between buffer eviction and batch write.
Remaining Findings
-
[Warning]
ConsumerProcessor.scheduleRenewTask— No max batch size for renew. The renew path still collects all expiring handles in a group into a singlebatchChangeInvisibleTimecall. If a group has thousands of handles expiring simultaneously (e.g., after a long consumer pause), this creates a very large batch. Consider capping the batch size (e.g.,proxyConfig.getBatchChangeInvisibleTimeMaxNum()) and splitting into multiple calls. -
[Warning]
ChangeInvisibleTimeProcessor.processBatchRequestAsync— Batch rejection vs. auto-split. WhenrequestEntries.size() > batchMaxNum, the broker rejects the entire batch. If the proxy split threshold and broker max size are misconfigured, all entries fail. Consider auto-splitting at the broker level instead of rejecting, or at minimum documenting the expected relationship betweenpopBatchChangeInvisibleTimeMaxSize(proxy) andbatchChangeInvisibleTimeMaxNum(broker). -
[Info]
ChangeInvisibleTimeRequestEntry— Good addition of a first-class remoting type. Theencode()/decode()methods handle the serialization correctly. The@JSONFieldannotations ensure proper JSON mapping. -
[Info]
RemotingSerializableCompatTest— Skippingtransientfields in the compatibility check is correct. Transient fields are not part of the wire format.
Cross-repo Note
This PR adds BATCH_CHANGE_MESSAGE_INVISIBLETIME as a new RequestCode. Ensure the RequestCode value does not conflict with any in-flight PRs in apache/rocketmq. The corresponding client-side support is included in this PR, so no cross-repo coordination is needed.
Automated review by github-manager-bot
|
[P1] Preserve input ordering when returning batch change-invisible-time results
When a renewal batch contains handles from different brokers/real topics, or contains an expired handle, a result can therefore be assigned to a different message. This may update message A with message B receipt handle and cause subsequent renew/ack operations to target the wrong offset. Please preserve the original input order in the returned result list, or correlate each result with its |
| } | ||
| } | ||
|
|
||
| public void batchChangeInvisibilityDuration(List<ChangeInvisibleTimeRequestEntry> changeRecords) { |
There was a problem hiding this comment.
changeInvisibilityDuration can call batchChangeInvisibilityDuration
| return; | ||
| } | ||
| for (CompletableFuture<List<BatchChangeInvisibleTimeResult>> resultFuture : futures) { | ||
| batchResultList.addAll(resultFuture.join()); |
There was a problem hiding this comment.
This changes the result ordering contract. batchResultList is built from expired handles first, then appends each broker/topic group using HashMap.values() iteration order. However, ReceiptHandleProcessor.batchChangeInvisibleTime consumes the returned list by index and completes futureList.get(indexes.get(i)) with results.get(i). If one renewal batch contains handles for multiple brokers, the result list can differ from the original handleMessageList order, causing a message to receive another message's renewed receipt handle. Please either preserve the original input order when building the result list, or have the caller map results back by BatchChangeInvisibleTimeResult.getReceiptHandleMessage() instead of by index.
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after new commit)
Summary
One new commit since the previous review:
- Fix test flake — Reorders mock setup before in , properly mocks to return a completed , and adds try/finally cleanup.
New Commit Assessment
- [Info] — Good fix. The previous ordering ( → ) had a race where the background thread could attempt processing before the mock was configured. The new ordering (mock setup → → ) eliminates the race. The try/finally cleanup ( + ) is also a good practice to avoid test interference.
Previous Findings — No Change
All previous findings remain as stated in the last review. The remaining warnings about batch size capping in and broker-level batch rejection vs. auto-split are still applicable.
No new issues found in this commit.
Automated review by github-manager-bot
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after new commit)
Summary
One new commit since the previous review:
- Fix test flake — Reorders mock setup before
start()inReceiptHandleProcessorTest.testStart(), properly mockschangeInvisibleTimeto return a completedCompletableFuture<AckResult>, and adds try/finally cleanup.
New Commit Assessment
- [Info]
ReceiptHandleProcessorTest.testStart— Good fix. The previous ordering (start()thenaddReceiptHandle()) had a race where the background thread could attempt processing before the mock was configured. The new ordering (mock setup,addReceiptHandle(), thenstart()) eliminates the race. The try/finally cleanup (removeReceiptHandle+shutdown) is also a good practice to avoid test interference.
Previous Findings — No Change
All previous findings remain as stated in the last review. The remaining warnings about batch size capping in ConsumerProcessor.scheduleRenewTask and broker-level batch rejection vs. auto-split are still applicable.
No new issues found in this commit.
Automated review by github-manager-bot
Fixes #10489
Summary
enableBatchChangeInvisibleTime.Tests
mvn -pl common,remoting,client,broker,proxy -DskipTests compile test-compilemvn -pl common,remoting,client,broker,proxy -DfailIfNoTests=false -Dtest=BatchChangeInvisibleTimeTest,MQClientAPIImplTest#testBatchChangeInvisibleTimeAsyncSendsRequestHeader+testProcessBatchChangeInvisibleTimeResponse,ChangeInvisibleTimeProcessorTest#testProcessBatchRequestRejectsMismatchedTopicOrGroup+testProcessBatchKvRequestDoesNotBuildSingleHeader+testProcessBatchRequestConvertsBadEntryToPerEntryFailure+testProcessBatchRequestRejectsOversizedBody,PopConsumerCacheTest,PopConsumerRocksdbStoreTest,ConsumerProcessorTest#testBatchChangeInvisibleTimeSplitByRealTopic+testBatchChangeInvisibleTime+testBatchChangeInvisibleTimeSplitOversizedBrokerGroup,ReceiveMessageResponseStreamWriterTest#testWriteBackFailureShouldBatchChangeInvisibleTime,ReceiptHandleProcessorTest#testBatchRenewMessage+testBatchClearGroup testmvn -pl common,remoting,client,broker,proxy -DfailIfNoTests=false -Dtest=ReceiveMessageResponseStreamWriterTest,ReceiptHandleProcessorTest test