Skip to content

[ISSUE #10525] Reduce pull/dispatch path allocation via primitive arrays, ThreadLocal reuse, and lambda elimination#10526

Open
wang-jiahua wants to merge 1 commit into
apache:developfrom
wang-jiahua:perf/pull-dispatch-path-optimization
Open

[ISSUE #10525] Reduce pull/dispatch path allocation via primitive arrays, ThreadLocal reuse, and lambda elimination#10526
wang-jiahua wants to merge 1 commit into
apache:developfrom
wang-jiahua:perf/pull-dispatch-path-optimization

Conversation

@wang-jiahua

@wang-jiahua wang-jiahua commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Which Issue(s) This PR Fixes

Fixes #10525

Brief Description

Reduce allocation in the pull/dispatch path through five independent optimizations:

  1. GetMessageResult — Replace List<Long> with long[] + addQueueOffset(long) method. Eliminates Long boxing and ArrayList resize overhead. Right-sized initial capacity via constructor parameter (capped at 256 to prevent OOM).
  2. DispatchRequest — Change final fields to mutable + add reset() method for ThreadLocal reuse. Eliminates per-message DispatchRequest allocation in the dispatch hot path.
  3. DefaultMappedFile — Cache append slice and merge dual-slice operation in selectMappedBuffer. Eliminates redundant ByteBuffer.slice() allocation.
  4. DefaultMessageStore — Remove thenAccept callback lambda in putMessage/putMessages. Eliminates per-message closure allocation (captures this + beginTime).
  5. ConsumeQueue — Make topicQueueKey a final field to avoid per-call string computation.

How Did You Test This Change?

  1. Unit tests: All 62 tests pass (EscapeBridgeTest, PopConsumerServiceTest, PopLiteMessageProcessorTest).
  2. Compilation: store + broker modules compile cleanly on JDK 21.
  3. Commercial compatibility: No commercial classes extend GetMessageResult, DefaultMappedFile, DispatchRequest, ConsumeQueue, or DefaultMessageStore.
  4. A/B benchmark (256 threads, 1KB sync send, Dragonwell JDK 21, 6g heap):
Metric Baseline (develop) Optimized (S3) Change
TPS ~209k ~217k +3.8%
p99 RT 3ms 2ms -33%
Average RT 1.224ms 1.187ms -3.0%
Young GCs (total) 388 372 -4.1%

Note: S3's pull-path optimizations (GetMessageResult, DispatchRequest) are not fully exercised by a Producer-only benchmark. A Consumer benchmark would show additional improvement.

Copilot AI review requested due to automatic review settings June 17, 2026 15:00

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

This PR focuses on reducing allocations and improving I/O paths in the store layer by reusing buffers, adding a ByteBuffer-based FileChannel append API, and optimizing queue offset storage.

Changes:

  • Add appendMessageUsingFileChannel(ByteBuffer) to MappedFile and implement it in DefaultMappedFile
  • Optimize buffer usage (cached slice reuse; safer SelectMappedBufferResult slicing via duplicate() + explicit limits)
  • Replace GetMessageResult queue offsets from List<Long> to a primitive long[] with a lightweight List view and add addQueueOffset(...)

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java Adds a ByteBuffer overload for FileChannel-based appends
store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java Implements ByteBuffer FileChannel append; reuses append slices; adjusts buffer selection logic
store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java Switches queue offsets to primitive array + view; adds addQueueOffset
store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java Makes fields resettable; adds reset(...) and a package-private ctor
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java Simplifies async put methods; pre-sizes GetMessageResult with maxMsgNums
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java Reuses a precomputed topic-queue key; uses ByteBuffer FileChannel append
broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java Updates tests to use addQueueOffset
broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java Updates tests to use addQueueOffset
broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java Updates tests to use addQueueOffset

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
Comment thread store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
Comment thread store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

This PR reduces allocation in the pull/dispatch hot path through six optimizations: primitive long[] replacing List<Long> in GetMessageResult, DispatchRequest object reuse via reset(), cached topicQueueKey in ConsumeQueue, ByteBuffer-based FileChannel write in DefaultMappedFile, and removal of async callback in DefaultMessageStore. The direction is correct — eliminating autoboxing and buffer copies in high-throughput paths.

Findings

  • [Critical] DefaultMessageStore.java:620-630Removal of put-message stats tracking. The thenAccept callback that tracked setPutMessageEntireTimeMax(), getPutMessageFailedTimes(), and the slow-put warning log has been removed entirely. This eliminates observability for:

    1. Put message latency percentiles (used by monitoring/alerting)
    2. Failed put message counts (used for error rate tracking)
    3. Slow put warnings (>500ms threshold)

    The PR title says "Reduce pull/dispatch path allocation" but this removes stats from the put path. This appears to be scope creep or an unintended side effect. If the goal is to reduce allocation, consider keeping the stats tracking but using a more efficient mechanism (e.g., direct field assignment instead of lambda capture).

  • [Warning] DispatchRequest.java:45-70reset() method always sets success = true. The error constructor DispatchRequest(..., long tagsCode) sets success = false, but reset() unconditionally sets it to true. If the object pool reuses a failed DispatchRequest, the error state is silently lost. Consider either:

    1. Adding a success parameter to reset()
    2. Or ensuring failed requests are never reused (document this contract)
  • [Info] GetMessageResult.java:45-55 — The long[] array with AbstractList<Long> wrapper for backward compatibility is a good pattern. The growth formula size + (size >> 1) avoids the +1 in addQueueOffset, but since the default constructor initializes with new long[100], this is safe in practice.

  • [Info] DefaultMappedFile.java:200-210 — The cachedAppendSlice lazy initialization is not volatile/synchronized, but since the slice is deterministic (same underlying buffer), the worst case is one redundant allocation, which is benign.

Suggestions

  1. Restore or relocate the put-message stats tracking in DefaultMessageStore — this is a regression in observability.
  2. Clarify the DispatchRequest reuse contract regarding the success field.
  3. Consider adding a JMH benchmark for the ConsumeQueue.appendMessage path to quantify the improvement.

Automated review by github-manager-bot

…ve arrays, ThreadLocal reuse, and lambda elimination

- GetMessageResult: List<Long> -> long[] + addQueueOffset(long), right-sized capacity

- DispatchRequest: ThreadLocal reuse via mutable fields + reset()

- DefaultMappedFile: cached append slice + dual-slice merge in selectMappedBuffer

- DefaultMessageStore: remove thenAccept callback lambda in putMessage/putMessages

- ConsumeQueue: topicQueueKey final field

- Update 3 test files for GetMessageResult.addQueueOffset API
@wang-jiahua wang-jiahua force-pushed the perf/pull-dispatch-path-optimization branch from 4644074 to bcf8c40 Compare June 17, 2026 15:58
@codecov-commenter

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 65.67164% with 23 lines in your changes missing coverage. Please review.
✅ Project coverage is 48.07%. Comparing base (59a70d9) to head (bcf8c40).
⚠️ Report is 4 commits behind head on develop.

Files with missing lines Patch % Lines
...va/org/apache/rocketmq/store/GetMessageResult.java 59.25% 9 Missing and 2 partials ⚠️
...ache/rocketmq/store/logfile/DefaultMappedFile.java 61.53% 9 Missing and 1 partial ⚠️
...ava/org/apache/rocketmq/store/DispatchRequest.java 77.77% 2 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             develop   #10526      +/-   ##
=============================================
- Coverage      48.19%   48.07%   -0.12%     
+ Complexity     13394    13363      -31     
=============================================
  Files           1377     1377              
  Lines         100730   100779      +49     
  Branches       13012    13020       +8     
=============================================
- Hits           48542    48454      -88     
- Misses         46254    46378     +124     
- Partials        5934     5947      +13     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after new commits)

Summary

Force-pushed update to the pull/dispatch path allocation reduction PR. The core optimizations remain: long[] replacing List<Long> in GetMessageResult, DispatchRequest ThreadLocal reuse, cached topicQueueKey, DefaultMappedFile buffer optimizations, and async callback removal in DefaultMessageStore. New: ByteBuffer-based appendMessageUsingFileChannel in ConsumeQueue and duplicate() instead of slice() in selectMappedBuffer.

Findings

  • [Critical] DefaultMessageStore.javaPut-message stats tracking still removed. The thenAccept callback containing setPutMessageEntireTimeMax(), getPutMessageFailedTimes(), and the slow-put warning log (>500ms) has been removed. This eliminates observability for put latency, failure counts, and slow-put alerts. This was flagged in the previous review and remains unaddressed. This is a regression in monitoring capability.

    Suggestion: Keep the stats tracking but avoid the lambda allocation. Options:

    1. Inline the stats tracking after the synchronous putMessage call (no lambda needed)
    2. Use a pre-allocated callback object stored in a field
    3. Move stats tracking to the caller of putMessage/putMessages
  • [Warning] ConsumeQueue.javabyteBufferIndex.flip() on shared buffer. The flip() call before appendMessageUsingFileChannel(this.byteBufferIndex) is correct for the write path (sets limit=position, position=0). However, byteBufferIndex is a shared field. If any code path reads from this buffer after the flip() without first resetting it, it will see stale data. Since the dispatch path is single-threaded per ConsumeQueue, this is safe in practice, but consider adding a comment documenting the buffer lifecycle contract.

  • [Info] DefaultMappedFile.java:selectMappedBuffer — Using duplicate() + limit() instead of slice() + slice() reduces one ByteBuffer allocation. The duplicate() shares the underlying data but has independent position/limit, which is correct here.

  • [Info] DefaultMappedFile.java:appendMessageUsingFileChannel(ByteBuffer) — New overload accepting ByteBuffer directly. The data.remaining() call correctly computes the write length. The flip() in the caller ensures the buffer is in read mode. This avoids the .array() call which would throw UnsupportedOperationException for direct buffers.

Unresolved from Previous Review

  • DispatchRequest.reset() always sets success = true — if a failed request is reused from the ThreadLocal pool, the error state is silently lost. Consider documenting that failed requests must not be reused, or adding a success parameter to reset().

Automated review by github-manager-bot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Reduce pull/dispatch path allocation via primitive arrays, ThreadLocal reuse, and lambda elimination

4 participants