[improve][broker] Cut PendingAcksMap per-entry overhead with primitive storage#26024
Draft
void-ptr974 wants to merge 6 commits into
Draft
[improve][broker] Cut PendingAcksMap per-entry overhead with primitive storage#26024void-ptr974 wants to merge 6 commits into
void-ptr974 wants to merge 6 commits into
Conversation
Contributor
Author
2 tasks
Member
|
Let's wait until fastutil has been restored by #26032. There's also other optimizations which could be done after that. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Review shape
This draft keeps the complete
PendingAcksMapstorage experiment in one place so the end-to-end trade-off is visible. The merge path can be split into smaller PRs:PendingAcksMap.size()change separate. If #26019 lands first, this branch can keep the cached-size accounting with the new inner storage.(remainingUnacked, stickyKeyHash)as onelongand keep unpacking at API/callback boundaries.Long2LongMap/Long2LongOpenHashMapas a reusable primitive collection.TreeMap<Long, IntIntPair>with primitivelong -> longstorage.BitSetindex to keep same-ledger prefix cleanup bounded without bringing back the innerTreeMap.Motivation
PendingAcksMapis used on the Shared and Key_Shared consumer dispatch path to track entries that were sent but not fully acknowledged. The current shape is:That allocates boxed
Longkeys, innerTreeMapnodes, andIntIntPairvalues for every pending entry. With the default consumer settings, a normal receiver window is aroundreceiverQueueSize=1000, while slow consumers can reachmaxUnackedMessagesPerConsumer=50000and a subscription can reachmaxUnackedMessagesPerSubscription=200000.PendingAcksMaptracks entries, not individual messages, so batching reduces the entry count, but the per-entry overhead is still on the hot consumer path.Design
The outer
TreeMapis kept becauseremoveAllUpTo(ledgerId, entryId)still needs ledger ordering and whole-ledger removal. The inner map does not need sorted iteration: exact-entry operations use(ledgerId, entryId), boundary cleanup checksentryId <= markDeleteEntryId, and replay ordering is rebuilt later byMessageRedeliveryController.The new per-ledger bucket stores entries in
Long2LongOpenHashMap, where the value packsremainingUnackedin the high 32 bits andstickyKeyHashin the low 32 bits. Signed sticky-key hashes round-trip through the packed value. Packed value0is valid, so lookups use an explicit not-found sentinel only after checking the map lookup path.LedgerPendingAcksalso keeps aBitSetentry-id index. The primitive map remains the source of truth; theBitSetonly accelerates prefix cleanup inside the boundary ledger. The index is used for non-negative entry ids up to1 << 20. If a bucket sees an entry id outside that indexed range, the index is cleared and cleanup falls back to scanning the primitive map for that bucket. That keeps the behavior correct for larger ledgers or custom configurations without allocating a ledger-sized bitmap.Correctness coverage
Long2LongOpenHashMapTest.testZeroValueCanBeDistinguishedFromMissingKeyandtestRandomOperationsAgainstHashMapcover zero values, missing values, updates, removals, iteration, and randomized behavior againstHashMap.PendingAcksMapTest.packedPendingAckFields_RoundTripThroughPublicAccessors,removeAllUpTo_PreservesPackedFieldsInCallback, andupdateRemainingUnacked_PreservesPackedStickyKeyHash.PendingAcksMapTest.removeAllUpTo_RemovesBoundaryEntriesWithUnorderedInnerMapandremoveAllUpTo_RemovesWholeLedgersAndUnorderedBoundaryEntries.PendingAcksMapTest.removeAllUpTo_KeepsSameLedgerEntriesAfterMarkDelete,removeAllUpTo_HandlesLargeEntryGapsAfterFirstEntryRemoval, andremoveAllUpTo_HandlesEntryIdsOutsideBitmapIndexRange.PendingAcksMapTestcoverage still exercises add, contains, remove,removeAllUpTo, callbacks,forEach,forEachAndClear, and size behavior.Benchmark
The benchmark class contains both
oldProductionandproductionimplementations, so each run compares the old and new layout in the same JMH invocation. Datasets are append-order: entries fill one ledger before rolling to the next ledger.Short local run used for the tables below:
./gradlew :microbench:shadowJar java -jar microbench/build/libs/microbench-5.0.0-M1-SNAPSHOT-benchmarks.jar \ 'PendingAcksMapBenchmark\.(dispatchAndAckCycle|dispatchAckAndPartialAckCycle|removeAllUpTo)' \ -p implementation=oldProduction,production \ -p dataset=defaultUnacked50kEntries1Ledger,defaultUnacked50kEntries5Ledgers,defaultUnacked50kEntries10Ledgers,defaultUnacked50kEntries20Ledgers \ -wi 1 -i 2 -w 1s -r 1s -f 1 -prof gc \ -rf csv -rff /private/tmp/pending_acks_pr26024_bitset_1_5_10_20.csvJMH version: 1.37. JVM: OpenJDK 25.0.2. The run is intentionally short for PR-sized signal; the benchmark is reproducible by increasing
-wi,-i, or forks on the same command.Steady-state dispatch/ack cycle, 50k pending entries:
dispatchAndAckCycledispatchAckAndPartialAckCycleFor
dispatchAndAckCycle, normalized allocation was roughly neutral at one ledger (108.2 -> 111.6 B/op) and lower once the pending window crossed ledger buckets (~111 B/op -> ~80 B/op).Range cleanup, 50k pending entries:
removeAllUpToremoveAllUpToSameLedger, small prefixThe remaining trade-off is a single large ledger where the old inner
TreeMapcan remove a large sorted prefix very efficiently. TheBitSetindex avoids the much worse primitive-map full scan for small prefix cleanup, while multi-ledgerremoveAllUpTobenefits from cheaper whole-ledger removal and lower retained object count.Object size
Measured locally with JOL 0.17
GraphLayout.totalSize()on OpenJDK 25.0.2. The helper source and JOL dependency are not included in this PR.TreeMapThe dense/default-window cases are net wins. The sparse many-ledger residual case is the bound where per-ledger primitive-map and
BitSetobjects dominate.Capacity retention is the main memory caveat. The primitive map and
BitSetare array-backed and do not shrink while the ledger bucket remains non-empty:TreeMapIn normal mark-delete progress, completed ledger buckets are removed and their arrays are released. The retained-capacity case matters when a bucket grows large and then keeps a small tail for a long time.
Verification
./gradlew :pulsar-common:test --tests org.apache.pulsar.common.util.collections.Long2LongOpenHashMapTest --max-workers=1 -PtestRetryCount=0 --rerun-tasks./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.PendingAcksMapTest --max-workers=1 -PtestRetryCount=0 --rerun-tasks./gradlew :microbench:shadowJarGraphLayout.totalSize()run against an out-of-tree helper