Skip to content

[improve][broker] Cut PendingAcksMap per-entry overhead with primitive storage#26024

Draft
void-ptr974 wants to merge 6 commits into
apache:masterfrom
void-ptr974:pending-acks-local-primitive-map
Draft

[improve][broker] Cut PendingAcksMap per-entry overhead with primitive storage#26024
void-ptr974 wants to merge 6 commits into
apache:masterfrom
void-ptr974:pending-acks-local-primitive-map

Conversation

@void-ptr974

@void-ptr974 void-ptr974 commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Review shape

This draft keeps the complete PendingAcksMap storage experiment in one place so the end-to-end trade-off is visible. The merge path can be split into smaller PRs:

Step Scope
Size tracking Keep the O(1) PendingAcksMap.size() change separate. If #26019 lands first, this branch can keep the cached-size accounting with the new inner storage.
Packed value layout Encapsulate (remainingUnacked, stickyKeyHash) as one long and keep unpacking at API/callback boundaries.
Primitive map Add Long2LongMap / Long2LongOpenHashMap as a reusable primitive collection.
Pending ack storage Replace the inner per-ledger TreeMap<Long, IntIntPair> with primitive long -> long storage.
Cleanup index Add a per-ledger BitSet index to keep same-ledger prefix cleanup bounded without bringing back the inner TreeMap.

Motivation

PendingAcksMap is used on the Shared and Key_Shared consumer dispatch path to track entries that were sent but not fully acknowledged. The current shape is:

TreeMap<Long, TreeMap<Long, IntIntPair>>

That allocates boxed Long keys, inner TreeMap nodes, and IntIntPair values for every pending entry. With the default consumer settings, a normal receiver window is around receiverQueueSize=1000, while slow consumers can reach maxUnackedMessagesPerConsumer=50000 and a subscription can reach maxUnackedMessagesPerSubscription=200000. PendingAcksMap tracks entries, not individual messages, so batching reduces the entry count, but the per-entry overhead is still on the hot consumer path.

Design

The outer TreeMap is kept because removeAllUpTo(ledgerId, entryId) still needs ledger ordering and whole-ledger removal. The inner map does not need sorted iteration: exact-entry operations use (ledgerId, entryId), boundary cleanup checks entryId <= markDeleteEntryId, and replay ordering is rebuilt later by MessageRedeliveryController.

The new per-ledger bucket stores entries in Long2LongOpenHashMap, where the value packs remainingUnacked in the high 32 bits and stickyKeyHash in the low 32 bits. Signed sticky-key hashes round-trip through the packed value. Packed value 0 is valid, so lookups use an explicit not-found sentinel only after checking the map lookup path.

LedgerPendingAcks also keeps a BitSet entry-id index. The primitive map remains the source of truth; the BitSet only accelerates prefix cleanup inside the boundary ledger. The index is used for non-negative entry ids up to 1 << 20. If a bucket sees an entry id outside that indexed range, the index is cleared and cleanup falls back to scanning the primitive map for that bucket. That keeps the behavior correct for larger ledgers or custom configurations without allocating a ledger-sized bitmap.

Correctness coverage

Concern Coverage
Primitive map semantics Long2LongOpenHashMapTest.testZeroValueCanBeDistinguishedFromMissingKey and testRandomOperationsAgainstHashMap cover zero values, missing values, updates, removals, iteration, and randomized behavior against HashMap.
Packed value preservation PendingAcksMapTest.packedPendingAckFields_RoundTripThroughPublicAccessors, removeAllUpTo_PreservesPackedFieldsInCallback, and updateRemainingUnacked_PreservesPackedStickyKeyHash.
Inner map no longer sorted PendingAcksMapTest.removeAllUpTo_RemovesBoundaryEntriesWithUnorderedInnerMap and removeAllUpTo_RemovesWholeLedgersAndUnorderedBoundaryEntries.
BitSet cleanup index PendingAcksMapTest.removeAllUpTo_KeepsSameLedgerEntriesAfterMarkDelete, removeAllUpTo_HandlesLargeEntryGapsAfterFirstEntryRemoval, and removeAllUpTo_HandlesEntryIdsOutsideBitmapIndexRange.
Existing pending-ack operations Existing PendingAcksMapTest coverage still exercises add, contains, remove, removeAllUpTo, callbacks, forEach, forEachAndClear, and size behavior.

Benchmark

The benchmark class contains both oldProduction and production implementations, so each run compares the old and new layout in the same JMH invocation. Datasets are append-order: entries fill one ledger before rolling to the next ledger.

Short local run used for the tables below:

./gradlew :microbench:shadowJar

java -jar microbench/build/libs/microbench-5.0.0-M1-SNAPSHOT-benchmarks.jar \
  'PendingAcksMapBenchmark\.(dispatchAndAckCycle|dispatchAckAndPartialAckCycle|removeAllUpTo)' \
  -p implementation=oldProduction,production \
  -p dataset=defaultUnacked50kEntries1Ledger,defaultUnacked50kEntries5Ledgers,defaultUnacked50kEntries10Ledgers,defaultUnacked50kEntries20Ledgers \
  -wi 1 -i 2 -w 1s -r 1s -f 1 -prof gc \
  -rf csv -rff /private/tmp/pending_acks_pr26024_bitset_1_5_10_20.csv

JMH version: 1.37. JVM: OpenJDK 25.0.2. The run is intentionally short for PR-sized signal; the benchmark is reproducible by increasing -wi, -i, or forks on the same command.

Steady-state dispatch/ack cycle, 50k pending entries:

Benchmark 1 ledger 5 ledgers 10 ledgers 20 ledgers
dispatchAndAckCycle 204.8 -> 192.9 ns/op (-5.8%) 181.1 -> 129.5 ns/op (-28.5%) 189.3 -> 172.8 ns/op (-8.7%) 265.9 -> 163.7 ns/op (-38.4%)
dispatchAckAndPartialAckCycle 216.2 -> 158.4 ns/op (-26.7%) 185.3 -> 126.1 ns/op (-32.0%) 178.5 -> 131.5 ns/op (-26.3%) 180.1 -> 122.1 ns/op (-32.2%)

For dispatchAndAckCycle, normalized allocation was roughly neutral at one ledger (108.2 -> 111.6 B/op) and lower once the pending window crossed ledger buckets (~111 B/op -> ~80 B/op).

Range cleanup, 50k pending entries:

Benchmark 1 ledger 5 ledgers 10 ledgers 20 ledgers
removeAllUpTo 788.7 us -> 1.903 ms (+141.3%) 572.1 -> 430.7 us (-24.7%) 533.4 -> 204.1 us (-61.7%) 487.6 -> 122.5 us (-74.9%)
removeAllUpToSameLedger, small prefix 43.2 -> 63.1 us (+46.2%) 41.0 -> 62.0 us (+51.3%) 37.9 -> 59.8 us (+57.9%) 36.8 -> 64.4 us (+74.8%)

The remaining trade-off is a single large ledger where the old inner TreeMap can remove a large sorted prefix very efficiently. The BitSet index avoids the much worse primitive-map full scan for small prefix cleanup, while multi-ledger removeAllUpTo benefits from cheaper whole-ledger removal and lower retained object count.

Object size

Measured locally with JOL 0.17 GraphLayout.totalSize() on OpenJDK 25.0.2. The helper source and JOL dependency are not included in this PR.

Entries / ledgers Old TreeMap Primitive map Primitive + BitSet Primitive + BitSet vs old BitSet overhead vs primitive
100 / 1 8.7 KiB 4.5 KiB 4.5 KiB -48.0% +88 B
1,000 / 1 86.1 KiB 34.2 KiB 34.4 KiB -60.0% +200 B
50,000 / 1 4.20 MiB 2.13 MiB 2.13 MiB -49.2% +8.1 KiB
50,000 / 5 4.18 MiB 1.33 MiB 1.34 MiB -68.0% +10.4 KiB
50,000 / 10 4.17 MiB 1.33 MiB 1.34 MiB -67.9% +10.7 KiB
50,000 / 20 4.14 MiB 1.33 MiB 1.34 MiB -67.6% +11.4 KiB
1,000 / 100 73.5 KiB 68.0 KiB 75.8 KiB +3.2% +7.8 KiB

The dense/default-window cases are net wins. The sparse many-ledger residual case is the bound where per-ledger primitive-map and BitSet objects dominate.

Capacity retention is the main memory caveat. The primitive map and BitSet are array-backed and do not shrink while the ledger bucket remains non-empty:

Scenario Old TreeMap Primitive + BitSet
Peak 50k entries 4.20 MiB 2.13 MiB
Peak 50k, then keep 1k in the same bucket 86.1 KiB 2.13 MiB
Peak 50k, then remove the whole ledger bucket 64 B 64 B

In normal mark-delete progress, completed ledger buckets are removed and their arrays are released. The retained-capacity case matters when a bucket grows large and then keeps a small tail for a long time.

Verification

  • ./gradlew :pulsar-common:test --tests org.apache.pulsar.common.util.collections.Long2LongOpenHashMapTest --max-workers=1 -PtestRetryCount=0 --rerun-tasks
  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.PendingAcksMapTest --max-workers=1 -PtestRetryCount=0 --rerun-tasks
  • ./gradlew :microbench:shadowJar
  • JMH command listed above
  • Local JOL GraphLayout.totalSize() run against an out-of-tree helper

@void-ptr974 void-ptr974 marked this pull request as ready for review June 13, 2026 15:31
@void-ptr974

Copy link
Copy Markdown
Contributor Author

This PR intentionally does not include the O(1) PendingAcksMap.size() change from #26019. If #26019 is merged first, I will update this PR on top of it and keep the cached size accounting in the primitive-map implementation.

@void-ptr974 void-ptr974 marked this pull request as draft June 14, 2026 02:57
@lhotari

lhotari commented Jun 15, 2026

Copy link
Copy Markdown
Member

Let's wait until fastutil has been restored by #26032. There's also other optimizations which could be done after that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants