Skip to content

[ISSUE #10489] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes#10519

Open
qianye1001 wants to merge 10 commits into
apache:developfrom
qianye1001:codex/batch-change-invisible-time
Open

[ISSUE #10489] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes#10519
qianye1001 wants to merge 10 commits into
apache:developfrom
qianye1001:codex/batch-change-invisible-time

Conversation

@qianye1001

Copy link
Copy Markdown
Contributor

Fixes #10489

Summary

  • Add remoting/client/proxy support for BatchChangeInvisibleTime with a batch header scoped to one topic and consumer group for broker-side auth.
  • Add broker batch handling for PopConsumer KV change invisible time, including buffer-aware cache handling and delete-before-put RocksDB batch semantics.
  • Wire proxy batch call sites for receipt handle renew/cleanup, receive write-back failure nack, and filtered TO_RETURN messages, gated by enableBatchChangeInvisibleTime.

Tests

  • mvn -pl common,remoting,client,broker,proxy -DskipTests compile test-compile
  • mvn -pl common,remoting,client,broker,proxy -DfailIfNoTests=false -Dtest=BatchChangeInvisibleTimeTest,MQClientAPIImplTest#testBatchChangeInvisibleTimeAsyncSendsRequestHeader+testProcessBatchChangeInvisibleTimeResponse,ChangeInvisibleTimeProcessorTest#testProcessBatchRequestRejectsMismatchedTopicOrGroup+testProcessBatchKvRequestDoesNotBuildSingleHeader+testProcessBatchRequestConvertsBadEntryToPerEntryFailure+testProcessBatchRequestRejectsOversizedBody,PopConsumerCacheTest,PopConsumerRocksdbStoreTest,ConsumerProcessorTest#testBatchChangeInvisibleTimeSplitByRealTopic+testBatchChangeInvisibleTime+testBatchChangeInvisibleTimeSplitOversizedBrokerGroup,ReceiveMessageResponseStreamWriterTest#testWriteBackFailureShouldBatchChangeInvisibleTime,ReceiptHandleProcessorTest#testBatchRenewMessage+testBatchClearGroup test
  • mvn -pl common,remoting,client,broker,proxy -DfailIfNoTests=false -Dtest=ReceiveMessageResponseStreamWriterTest,ReceiptHandleProcessorTest test

@codecov-commenter

codecov-commenter commented Jun 16, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 61.49068% with 310 lines in your changes missing coverage. Please review.
✅ Project coverage is 48.23%. Comparing base (b4d92c0) to head (3c5cc1b).
⚠️ Report is 8 commits behind head on develop.

Files with missing lines Patch % Lines
...broker/processor/ChangeInvisibleTimeProcessor.java 50.60% 65 Missing and 16 partials ⚠️
...tmq/proxy/service/message/LocalMessageService.java 0.00% 60 Missing ⚠️
...he/rocketmq/proxy/processor/ConsumerProcessor.java 78.68% 19 Missing and 7 partials ⚠️
...cketmq/proxy/processor/ReceiptHandleProcessor.java 58.62% 19 Missing and 5 partials ⚠️
.../apache/rocketmq/test/client/rmq/RMQPopClient.java 0.00% 20 Missing ⚠️
...g/apache/rocketmq/client/impl/MQClientAPIImpl.java 64.00% 12 Missing and 6 partials ⚠️
...q/proxy/service/message/ClusterMessageService.java 0.00% 17 Missing ⚠️
...apache/rocketmq/broker/pop/PopConsumerService.java 63.15% 7 Missing and 7 partials ⚠️
...2/consumer/ReceiveMessageResponseStreamWriter.java 71.73% 11 Missing and 2 partials ⚠️
.../header/BatchChangeInvisibleTimeRequestHeader.java 0.00% 13 Missing ⚠️
... and 8 more
Additional details and impacted files
@@              Coverage Diff              @@
##             develop   #10519      +/-   ##
=============================================
+ Coverage      48.12%   48.23%   +0.10%     
- Complexity     13352    13535     +183     
=============================================
  Files           1377     1383       +6     
  Lines         100720   101531     +811     
  Branches       13012    13111      +99     
=============================================
+ Hits           48476    48976     +500     
- Misses         46309    46553     +244     
- Partials        5935     6002      +67     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

This PR adds batch support for ChangeInvisibleTime across the full stack (remoting protocol → client → broker → proxy), reducing network round-trips and RocksDB flushes when multiple pop consumer records need invisible-time updates in the same batch. The feature is gated by enableBatchChangeInvisibleTime and falls back to per-entry processing when disabled.

Findings

  • [Critical] PopConsumerCache.writeAndDeleteRecordsBusy-wait spin lock: while (!consumerLockService.tryLock(...)) { Thread.yield(); } is a CPU-consuming spin loop. Under high contention (many concurrent batch operations on the same group/topic), this can waste significant CPU. Consider using a timed lock (e.g., tryLock with a timeout + retry with backoff) or a blocking lock abstraction.

  • [Critical] PopConsumerCache.writeAndDeleteRecordsLock topic derived from first record only: The lock topic is parsed from lockRecord which comes from validateAndGetLockRecord(writeRecordList, deleteRecordList). If the batch somehow contains records spanning different topics (despite the header-level validation), this could lead to incorrect locking. Please add an assertion or defensive check that all records in the batch share the same topic+group.

  • [Warning] ChangeInvisibleTimeProcessor.processBatchRequestBatch size splitting: The proxy splits oversized batches (splitOversizedBrokerGroup), but the broker-side processBatchRequest also checks popBatchChangeInvisibleTimeMaxSize. If the proxy split threshold and broker max size are misconfigured, the broker will reject the batch and all entries fail. Consider documenting the expected relationship between these two config values, or having the broker auto-split instead of rejecting.

  • [Warning] DefaultReceiptHandleManager.scheduleRenewTaskNo max batch size for renew: The renew path collects all expiring handles in a group into a single batch call. If a group has thousands of handles expiring simultaneously (e.g., after a long consumer pause), this could create a very large batch. Consider capping the batch size and splitting into multiple calls.

  • [Warning] ConsumerProcessor.filterPopResultTO_RETURN batch not flushed on error path: The toReturnMessageList is accumulated during the message loop and sent after the loop via batchChangeInvisibleTime. If an exception occurs during the loop (e.g., in the REJECT case), the accumulated TO_RETURN handles may not be processed. Consider wrapping the post-loop batch send in a finally block or ensuring it runs regardless of earlier exceptions.

  • [Warning] PopConsumerCache.writeAndDeleteRecordsBuffer eviction race: When records are in the in-memory buffer and a writeAndDeleteRecords is called, the method splits records into buffer vs. store paths. The buffer deletion path calls buffer.remove(record) while the store path adds to storeDeleteRecords. If a concurrent scanExpiredRecords promotes a record from buffer to store between the split and the actual store delete, the record could be missed. The lock should prevent this, but please verify the lock scope covers the full split+write+delete sequence.

  • [Info] PopConsumerRocksdbStore.writeAndDeleteRecordsDelete-before-put ordering: The WriteBatch processes all deletes before puts. This is correct for the case where the same key appears in both lists (old record deleted, new record written with different key due to changed popTime). The ordering is sound.

  • [Info] Good defensive handling of null entries in prepareBatchRequestEntries and processBatchChangeInvisibleTimeResponse. Each null entry gets a failure response without aborting the entire batch.

  • [Info] Test coverage is comprehensive — covers cache, RocksDB store, service layer, processor, client API, and proxy components. The split-by-broker and oversized-batch tests in the proxy are particularly valuable.

Suggestions

  1. Replace the Thread.yield() spin loop with a timed wait or CompletableFuture-based lock to avoid CPU waste under contention.
  2. Add a defensive assertion that all records in a batch share the same topic+group before acquiring the lock.
  3. Consider aligning or documenting the relationship between proxy batchChangeInvisibleTimeMaxSize and broker popBatchChangeInvisibleTimeMaxSize.
  4. Add a max batch size cap in the renew path to handle edge cases with large numbers of expiring handles.

Cross-repo Note

This PR adds new request/response protocol types (BatchChangeInvisibleTimeRequestHeader, BatchChangeInvisibleTimeRequestBody, etc.) in rocketmq-remoting. The rocketmq-clients repos do not need changes since the batch protocol is internal (broker ↔ proxy), not client-facing.


Automated review by github-manager-bot

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (re-review)

Summary

Re-review after 2 new commits:

  1. 5df26c4 Fix single change invisible time KV write batch
  2. 76cc6b4 Serialize oversized batch change invisible time chunks

Changes Since Last Review

Commit 1 — Single-entry path fix: The changeInvisibilityDuration method now uses writeAndDeleteRecords for both the single-entry and batch paths, ensuring a single batched KV operation instead of separate write + delete calls. This is a clean improvement.

Commit 2 — Oversized batch chunking: The proxy now splits large handleMessageList into chunks of batchChangeInvisibleTimeMaxNum and processes them sequentially via thenCompose. This avoids overwhelming a single broker with many concurrent write-heavy batches. The fallback to per-item requests on failure is preserved.

Updated Findings

  • [Resolved] The single-entry path now correctly uses the batch API — good fix.
  • [Resolved] Oversized batches are now chunked and serialized — addresses the concern about large batch pressure.
  • [Info] PopConsumerCache.writeAndDeleteRecords — The spin-lock pattern (while (!tryLock) { Thread.yield(); }) is consistent with the existing deleteRecords method in the same class. While not ideal under high contention, this follows the established codebase convention. Acceptable for now; consider a timed backoff in a future cleanup.
  • [Positive] The validateAndGetLockRecord method correctly ensures all records share the same lock key, preventing cross-topic/group lock conflicts.
  • [Positive] Good test coverage for the new writeAndDeleteRecords method — tests cover empty deletes, buffered records, topic lock routing, and mixed lock key rejection.

Verdict

The two new commits meaningfully address the previous review findings. The implementation looks solid.


Automated re-review by github-manager-bot

public static final int BATCH_ACK_MESSAGE = 200151;
public static final int PEEK_MESSAGE = 200052;
public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053;
public static final int BATCH_CHANGE_MESSAGE_INVISIBLETIME = 200153;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest change

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

和 batch ack 一样的逻辑,就用这个吧

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after 4 new commits)

Summary

Four new commits since the previous review:

  1. Refine batch entries — Introduces ChangeInvisibleTimeRequestEntry as a first-class remoting type with proper serialization
  2. Remove redundant validation — Removes the batch-level topic/group validation in PopConsumerCache (now enforced at the header level)
  3. Simplify write/delete — Replaces the busy-wait spin lock with a clean split-buffer/store approach
  4. Fix serializable compat — Skips transient fields in the reflection-based compatibility test

Previous Findings — Status

  • [Critical] Busy-wait spin lock in writeAndDeleteRecordsResolved. The spin lock is removed. The new implementation splits records into buffer vs. store paths and processes them independently. Much cleaner.

  • [Critical] Lock topic derived from first record onlyResolved. The lock mechanism is removed entirely. The batch header enforces topic+group consistency, and the split logic processes each record independently.

  • [Warning] Batch size splitting — Partially addressed. The broker now checks batchMaxNum and rejects oversized batches. The proxy splits oversized groups via splitOversizedBrokerGroup. However, see remaining concern below.

  • [Warning] TO_RETURN batch not flushed on error — Likely resolved. The loop catches per-message exceptions and continues, so the post-loop batch send should execute. But consider wrapping the post-loop batchChangeInvisibleTime call in a finally block for extra safety.

  • [Warning] Buffer eviction race — Resolved. The new writeAndDeleteRecords cleanly separates buffer and store paths, avoiding the race between buffer eviction and batch write.

Remaining Findings

  • [Warning] ConsumerProcessor.scheduleRenewTaskNo max batch size for renew. The renew path still collects all expiring handles in a group into a single batchChangeInvisibleTime call. If a group has thousands of handles expiring simultaneously (e.g., after a long consumer pause), this creates a very large batch. Consider capping the batch size (e.g., proxyConfig.getBatchChangeInvisibleTimeMaxNum()) and splitting into multiple calls.

  • [Warning] ChangeInvisibleTimeProcessor.processBatchRequestAsyncBatch rejection vs. auto-split. When requestEntries.size() > batchMaxNum, the broker rejects the entire batch. If the proxy split threshold and broker max size are misconfigured, all entries fail. Consider auto-splitting at the broker level instead of rejecting, or at minimum documenting the expected relationship between popBatchChangeInvisibleTimeMaxSize (proxy) and batchChangeInvisibleTimeMaxNum (broker).

  • [Info] ChangeInvisibleTimeRequestEntry — Good addition of a first-class remoting type. The encode()/decode() methods handle the serialization correctly. The @JSONField annotations ensure proper JSON mapping.

  • [Info] RemotingSerializableCompatTest — Skipping transient fields in the compatibility check is correct. Transient fields are not part of the wire format.

Cross-repo Note

This PR adds BATCH_CHANGE_MESSAGE_INVISIBLETIME as a new RequestCode. Ensure the RequestCode value does not conflict with any in-flight PRs in apache/rocketmq. The corresponding client-side support is included in this PR, so no cross-repo coordination is needed.


Automated review by github-manager-bot

@fuyou001

Copy link
Copy Markdown
Contributor

[P1] Preserve input ordering when returning batch change-invisible-time results

batchChangeInvisibleTime does not preserve the order of handleMessageList: expired-handle results are appended first, while the remaining results are appended later in HashMap group iteration order. However, ReceiptHandleProcessor.batchChangeInvisibleTime consumes the returned list by index and completes the future for indexes.get(i) with results.get(i).

When a renewal batch contains handles from different brokers/real topics, or contains an expired handle, a result can therefore be assigned to a different message. This may update message A with message B receipt handle and cause subsequent renew/ack operations to target the wrong offset.

Please preserve the original input order in the returned result list, or correlate each result with its ReceiptHandleMessage instead of consuming results positionally. A regression test should cover interleaved broker/topic groups and an expired handle.

}
}

public void batchChangeInvisibilityDuration(List<ChangeInvisibleTimeRequestEntry> changeRecords) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changeInvisibilityDuration can call batchChangeInvisibilityDuration

return;
}
for (CompletableFuture<List<BatchChangeInvisibleTimeResult>> resultFuture : futures) {
batchResultList.addAll(resultFuture.join());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the result ordering contract. batchResultList is built from expired handles first, then appends each broker/topic group using HashMap.values() iteration order. However, ReceiptHandleProcessor.batchChangeInvisibleTime consumes the returned list by index and completes futureList.get(indexes.get(i)) with results.get(i). If one renewal batch contains handles for multiple brokers, the result list can differ from the original handleMessageList order, causing a message to receive another message's renewed receipt handle. Please either preserve the original input order when building the result list, or have the caller map results back by BatchChangeInvisibleTimeResult.getReceiptHandleMessage() instead of by index.

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after new commit)

Summary

One new commit since the previous review:

  • Fix test flake — Reorders mock setup before in , properly mocks to return a completed , and adds try/finally cleanup.

New Commit Assessment

  • [Info] — Good fix. The previous ordering ( → ) had a race where the background thread could attempt processing before the mock was configured. The new ordering (mock setup → → ) eliminates the race. The try/finally cleanup ( + ) is also a good practice to avoid test interference.

Previous Findings — No Change

All previous findings remain as stated in the last review. The remaining warnings about batch size capping in and broker-level batch rejection vs. auto-split are still applicable.

No new issues found in this commit.


Automated review by github-manager-bot

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after new commit)

Summary

One new commit since the previous review:

  • Fix test flake — Reorders mock setup before start() in ReceiptHandleProcessorTest.testStart(), properly mocks changeInvisibleTime to return a completed CompletableFuture<AckResult>, and adds try/finally cleanup.

New Commit Assessment

  • [Info] ReceiptHandleProcessorTest.testStart — Good fix. The previous ordering (start() then addReceiptHandle()) had a race where the background thread could attempt processing before the mock was configured. The new ordering (mock setup, addReceiptHandle(), then start()) eliminates the race. The try/finally cleanup (removeReceiptHandle + shutdown) is also a good practice to avoid test interference.

Previous Findings — No Change

All previous findings remain as stated in the last review. The remaining warnings about batch size capping in ConsumerProcessor.scheduleRenewTask and broker-level batch rejection vs. auto-split are still applicable.

No new issues found in this commit.


Automated review by github-manager-bot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes

6 participants