Preserve throttler tokens on failed writes#3097
Conversation
| val tokens = payload.length | ||
|
|
||
| @tailrec def tryConsume(currentBucket: ThrottleMode): Boolean = { | ||
| @tailrec def tryConsume(currentBucket: ThrottleMode): Option[(ThrottleMode, ThrottleMode)] = { |
There was a problem hiding this comment.
Use a dedicated class instead of tuple2
He-Pin
left a comment
There was a problem hiding this comment.
Review Summary
Verdict: Approve ✅
The fix correctly addresses the documented FIXME (#2825) where ThrottlerHandle.write consumed token bucket capacity even when the wrapped write returned false.
Correctness Analysis
-
tryConsumereturningOption[(ThrottleMode, ThrottleMode)]— Sound. Returns the(previousBucket, consumedBucket)pair needed for a precise CAS-based refund when no concurrent mutation occurred. -
refundTokensCAS loop — Correct three-way dispatch:currentBucket == consumedBucket→ full restore (no concurrent mutation)- Same
TokenBucketconfig → add backtokens, cap atcapacity - Mode changed (e.g.
SetThrottleto different mode) → no-op fallback
-
@tailrecretry on CAS failure — Correct for lock-free concurrent safety. -
Unthrottled/Blackholepaths —Blackholeshort-circuits before consume (unchanged).Unthrottledflows throughcase bucket @ _→tryConsumesucceeds → refund'scase _falls through to no-op (nothing to refund). Both correct. -
TOCTOU window between
tryConsumeandrefundTokens— Accepted tradeoff: if throttle mode changes (viaSetThrottle) during the window, the refund is lost. This is rare and benign.
Test Verification
ThrottlerHandleSpec— 1 test, passes ✅ThrottleModeSpec— 5 tests, passes ✅ThrottlerTransportAdapterSpec— 2 tests, passes ✅
Suggestions (non-blocking)
See inline comments for test coverage improvements.
He-Pin
left a comment
There was a problem hiding this comment.
welcome and please fix license header
|
|
||
| wrappedHandle.writeAttempts should ===(1) | ||
| handle.outboundThrottleMode.get() should ===(bucket) | ||
| } |
There was a problem hiding this comment.
Consider adding two more test cases to strengthen the regression coverage:
-
Successful write still consumes tokens — the success path changed from
BooleantoOption[(ThrottleMode, ThrottleMode)]. A test asserting that tokens are consumed and NOT refunded on a successful write would guard against a future regression that accidentally always-refunds. -
Unthrottledmode is unaffected — withUnthrottledas the default throttle mode, a test that writes withUnthrottledset and verifieswrappedHandle.write()is called (and returns the correct boolean) would cover the most common code path.
Example sketch:
"consume tokens on successful write" in {
val wrappedHandle = new TestHandle(writeResult = true)
val handle = ThrottlerHandle(wrappedHandle, system.deadLetters)
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 0, nanoTimeOfLastSend = 0L, availableTokens = 100)
handle.outboundThrottleMode.set(bucket)
handle.write(ByteString("1234567890")) should ===(true)
wrappedHandle.writeAttempts should ===(1)
handle.outboundThrottleMode.get().asInstanceOf[TokenBucket].availableTokens should ===(90)
}
"pass through writes when Unthrottled" in {
val wrappedHandle = new TestHandle(writeResult = true)
val handle = ThrottlerHandle(wrappedHandle, system.deadLetters)
handle.write(ByteString("data")) should ===(true)
wrappedHandle.writeAttempts should ===(1)
}Non-blocking — the fix itself is correct. These tests would further harden the regression net.
| else | ||
| (currentBucket, previousBucket) match { | ||
| case (bucket: TokenBucket, previous: TokenBucket) | ||
| if bucket.capacity == previous.capacity && bucket.tokensPerSecond == previous.tokensPerSecond => |
There was a problem hiding this comment.
Minor observation: bucket.tokensPerSecond == previous.tokensPerSecond uses exact Double equality. This is safe here because both values originate from the same SetThrottle configuration (same TokenBucket instance lineage), so no floating-point drift is possible between them. If a future change were to compute tokensPerSecond independently, consider switching to a tolerance-based comparison.
The CAS retry loop and the three-way dispatch logic are sound. Nice fix for a long-standing FIXME.
1b9b37a to
5c5a91f
Compare
5c5a91f to
b259808
Compare
pjfanning
left a comment
There was a problem hiding this comment.
blocking until we have a CI run
Summary
ThrottlerHandleused to consume token-bucket capacity before delegating to the wrapped transport handle. If the wrapped handle returnedfalse, no write happened but the bucket stayed depleted.This restores or refunds the reserved token state when the wrapped write fails, while leaving the successful write and blackhole paths unchanged.
Tests
remote/testOnly org.apache.pekko.remote.transport.ThrottlerHandleSpec org.apache.pekko.remote.classic.transport.ThrottleModeSpec org.apache.pekko.remote.classic.transport.ThrottlerTransportAdapterSpecremote/headerCheckremote/scalafmtAll