[ISSUE #10525] Reduce pull/dispatch path allocation via primitive arrays, ThreadLocal reuse, and lambda elimination#10526
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR focuses on reducing allocations and improving I/O paths in the store layer by reusing buffers, adding a ByteBuffer-based FileChannel append API, and optimizing queue offset storage.
Changes:
- Add
appendMessageUsingFileChannel(ByteBuffer)toMappedFileand implement it inDefaultMappedFile - Optimize buffer usage (cached slice reuse; safer
SelectMappedBufferResultslicing viaduplicate()+ explicit limits) - Replace
GetMessageResultqueue offsets fromList<Long>to a primitivelong[]with a lightweightListview and addaddQueueOffset(...)
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java | Adds a ByteBuffer overload for FileChannel-based appends |
| store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java | Implements ByteBuffer FileChannel append; reuses append slices; adjusts buffer selection logic |
| store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java | Switches queue offsets to primitive array + view; adds addQueueOffset |
| store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java | Makes fields resettable; adds reset(...) and a package-private ctor |
| store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java | Simplifies async put methods; pre-sizes GetMessageResult with maxMsgNums |
| store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java | Reuses a precomputed topic-queue key; uses ByteBuffer FileChannel append |
| broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java | Updates tests to use addQueueOffset |
| broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java | Updates tests to use addQueueOffset |
| broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java | Updates tests to use addQueueOffset |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
This PR reduces allocation in the pull/dispatch hot path through six optimizations: primitive long[] replacing List<Long> in GetMessageResult, DispatchRequest object reuse via reset(), cached topicQueueKey in ConsumeQueue, ByteBuffer-based FileChannel write in DefaultMappedFile, and removal of async callback in DefaultMessageStore. The direction is correct — eliminating autoboxing and buffer copies in high-throughput paths.
Findings
-
[Critical]
DefaultMessageStore.java:620-630— Removal of put-message stats tracking. ThethenAcceptcallback that trackedsetPutMessageEntireTimeMax(),getPutMessageFailedTimes(), and the slow-put warning log has been removed entirely. This eliminates observability for:- Put message latency percentiles (used by monitoring/alerting)
- Failed put message counts (used for error rate tracking)
- Slow put warnings (>500ms threshold)
The PR title says "Reduce pull/dispatch path allocation" but this removes stats from the put path. This appears to be scope creep or an unintended side effect. If the goal is to reduce allocation, consider keeping the stats tracking but using a more efficient mechanism (e.g., direct field assignment instead of lambda capture).
-
[Warning]
DispatchRequest.java:45-70—reset()method always setssuccess = true. The error constructorDispatchRequest(..., long tagsCode)setssuccess = false, butreset()unconditionally sets it totrue. If the object pool reuses a failedDispatchRequest, the error state is silently lost. Consider either:- Adding a
successparameter toreset() - Or ensuring failed requests are never reused (document this contract)
- Adding a
-
[Info]
GetMessageResult.java:45-55— Thelong[]array withAbstractList<Long>wrapper for backward compatibility is a good pattern. The growth formulasize + (size >> 1)avoids the+1inaddQueueOffset, but since the default constructor initializes withnew long[100], this is safe in practice. -
[Info]
DefaultMappedFile.java:200-210— ThecachedAppendSlicelazy initialization is not volatile/synchronized, but since the slice is deterministic (same underlying buffer), the worst case is one redundant allocation, which is benign.
Suggestions
- Restore or relocate the put-message stats tracking in
DefaultMessageStore— this is a regression in observability. - Clarify the
DispatchRequestreuse contract regarding thesuccessfield. - Consider adding a JMH benchmark for the
ConsumeQueue.appendMessagepath to quantify the improvement.
Automated review by github-manager-bot
…ve arrays, ThreadLocal reuse, and lambda elimination - GetMessageResult: List<Long> -> long[] + addQueueOffset(long), right-sized capacity - DispatchRequest: ThreadLocal reuse via mutable fields + reset() - DefaultMappedFile: cached append slice + dual-slice merge in selectMappedBuffer - DefaultMessageStore: remove thenAccept callback lambda in putMessage/putMessages - ConsumeQueue: topicQueueKey final field - Update 3 test files for GetMessageResult.addQueueOffset API
4644074 to
bcf8c40
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #10526 +/- ##
=============================================
- Coverage 48.19% 48.07% -0.12%
+ Complexity 13394 13363 -31
=============================================
Files 1377 1377
Lines 100730 100779 +49
Branches 13012 13020 +8
=============================================
- Hits 48542 48454 -88
- Misses 46254 46378 +124
- Partials 5934 5947 +13 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after new commits)
Summary
Force-pushed update to the pull/dispatch path allocation reduction PR. The core optimizations remain: long[] replacing List<Long> in GetMessageResult, DispatchRequest ThreadLocal reuse, cached topicQueueKey, DefaultMappedFile buffer optimizations, and async callback removal in DefaultMessageStore. New: ByteBuffer-based appendMessageUsingFileChannel in ConsumeQueue and duplicate() instead of slice() in selectMappedBuffer.
Findings
-
[Critical]
DefaultMessageStore.java— Put-message stats tracking still removed. ThethenAcceptcallback containingsetPutMessageEntireTimeMax(),getPutMessageFailedTimes(), and the slow-put warning log (>500ms) has been removed. This eliminates observability for put latency, failure counts, and slow-put alerts. This was flagged in the previous review and remains unaddressed. This is a regression in monitoring capability.Suggestion: Keep the stats tracking but avoid the lambda allocation. Options:
- Inline the stats tracking after the synchronous
putMessagecall (no lambda needed) - Use a pre-allocated callback object stored in a field
- Move stats tracking to the caller of
putMessage/putMessages
- Inline the stats tracking after the synchronous
-
[Warning]
ConsumeQueue.java—byteBufferIndex.flip()on shared buffer. Theflip()call beforeappendMessageUsingFileChannel(this.byteBufferIndex)is correct for the write path (setslimit=position, position=0). However,byteBufferIndexis a shared field. If any code path reads from this buffer after theflip()without first resetting it, it will see stale data. Since the dispatch path is single-threaded per ConsumeQueue, this is safe in practice, but consider adding a comment documenting the buffer lifecycle contract. -
[Info]
DefaultMappedFile.java:selectMappedBuffer— Usingduplicate()+limit()instead ofslice()+slice()reduces one ByteBuffer allocation. Theduplicate()shares the underlying data but has independent position/limit, which is correct here. -
[Info]
DefaultMappedFile.java:appendMessageUsingFileChannel(ByteBuffer)— New overload acceptingByteBufferdirectly. Thedata.remaining()call correctly computes the write length. Theflip()in the caller ensures the buffer is in read mode. This avoids the.array()call which would throwUnsupportedOperationExceptionfor direct buffers.
Unresolved from Previous Review
DispatchRequest.reset()always setssuccess = true— if a failed request is reused from the ThreadLocal pool, the error state is silently lost. Consider documenting that failed requests must not be reused, or adding asuccessparameter toreset().
Automated review by github-manager-bot
Which Issue(s) This PR Fixes
Fixes #10525
Brief Description
Reduce allocation in the pull/dispatch path through five independent optimizations:
GetMessageResult— ReplaceList<Long>withlong[]+addQueueOffset(long)method. EliminatesLongboxing andArrayListresize overhead. Right-sized initial capacity via constructor parameter (capped at 256 to prevent OOM).DispatchRequest— Changefinalfields to mutable + addreset()method for ThreadLocal reuse. Eliminates per-messageDispatchRequestallocation in the dispatch hot path.DefaultMappedFile— Cache append slice and merge dual-slice operation inselectMappedBuffer. Eliminates redundantByteBuffer.slice()allocation.DefaultMessageStore— RemovethenAcceptcallback lambda inputMessage/putMessages. Eliminates per-message closure allocation (capturesthis+beginTime).ConsumeQueue— MaketopicQueueKeyafinalfield to avoid per-call string computation.How Did You Test This Change?
EscapeBridgeTest,PopConsumerServiceTest,PopLiteMessageProcessorTest).store+brokermodules compile cleanly on JDK 21.GetMessageResult,DefaultMappedFile,DispatchRequest,ConsumeQueue, orDefaultMessageStore.