Skip to content

Preserve throttler tokens on failed writes#3097

Open
fallintoplace wants to merge 1 commit into
apache:mainfrom
fallintoplace:fix/throttler-token-refund
Open

Preserve throttler tokens on failed writes#3097
fallintoplace wants to merge 1 commit into
apache:mainfrom
fallintoplace:fix/throttler-token-refund

Conversation

@fallintoplace

Copy link
Copy Markdown

Summary

ThrottlerHandle used to consume token-bucket capacity before delegating to the wrapped transport handle. If the wrapped handle returned false, no write happened but the bucket stayed depleted.

This restores or refunds the reserved token state when the wrapped write fails, while leaving the successful write and blackhole paths unchanged.

Tests

  • remote/testOnly org.apache.pekko.remote.transport.ThrottlerHandleSpec org.apache.pekko.remote.classic.transport.ThrottleModeSpec org.apache.pekko.remote.classic.transport.ThrottlerTransportAdapterSpec
  • remote/headerCheck
  • remote/scalafmtAll

val tokens = payload.length

@tailrec def tryConsume(currentBucket: ThrottleMode): Boolean = {
@tailrec def tryConsume(currentBucket: ThrottleMode): Option[(ThrottleMode, ThrottleMode)] = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a dedicated class instead of tuple2

@He-Pin He-Pin left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

Verdict: Approve

The fix correctly addresses the documented FIXME (#2825) where ThrottlerHandle.write consumed token bucket capacity even when the wrapped write returned false.

Correctness Analysis

  1. tryConsume returning Option[(ThrottleMode, ThrottleMode)] — Sound. Returns the (previousBucket, consumedBucket) pair needed for a precise CAS-based refund when no concurrent mutation occurred.

  2. refundTokens CAS loop — Correct three-way dispatch:

    • currentBucket == consumedBucket → full restore (no concurrent mutation)
    • Same TokenBucket config → add back tokens, cap at capacity
    • Mode changed (e.g. SetThrottle to different mode) → no-op fallback
  3. @tailrec retry on CAS failure — Correct for lock-free concurrent safety.

  4. Unthrottled / Blackhole pathsBlackhole short-circuits before consume (unchanged). Unthrottled flows through case bucket @ _tryConsume succeeds → refund's case _ falls through to no-op (nothing to refund). Both correct.

  5. TOCTOU window between tryConsume and refundTokens — Accepted tradeoff: if throttle mode changes (via SetThrottle) during the window, the refund is lost. This is rare and benign.

Test Verification

  • ThrottlerHandleSpec — 1 test, passes ✅
  • ThrottleModeSpec — 5 tests, passes ✅
  • ThrottlerTransportAdapterSpec — 2 tests, passes ✅

Suggestions (non-blocking)

See inline comments for test coverage improvements.

@He-Pin He-Pin left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

welcome and please fix license header


wrappedHandle.writeAttempts should ===(1)
handle.outboundThrottleMode.get() should ===(bucket)
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding two more test cases to strengthen the regression coverage:

  1. Successful write still consumes tokens — the success path changed from Boolean to Option[(ThrottleMode, ThrottleMode)]. A test asserting that tokens are consumed and NOT refunded on a successful write would guard against a future regression that accidentally always-refunds.

  2. Unthrottled mode is unaffected — with Unthrottled as the default throttle mode, a test that writes with Unthrottled set and verifies wrappedHandle.write() is called (and returns the correct boolean) would cover the most common code path.

Example sketch:

"consume tokens on successful write" in {
  val wrappedHandle = new TestHandle(writeResult = true)
  val handle = ThrottlerHandle(wrappedHandle, system.deadLetters)
  val bucket = TokenBucket(capacity = 100, tokensPerSecond = 0, nanoTimeOfLastSend = 0L, availableTokens = 100)
  handle.outboundThrottleMode.set(bucket)

  handle.write(ByteString("1234567890")) should ===(true)

  wrappedHandle.writeAttempts should ===(1)
  handle.outboundThrottleMode.get().asInstanceOf[TokenBucket].availableTokens should ===(90)
}

"pass through writes when Unthrottled" in {
  val wrappedHandle = new TestHandle(writeResult = true)
  val handle = ThrottlerHandle(wrappedHandle, system.deadLetters)

  handle.write(ByteString("data")) should ===(true)
  wrappedHandle.writeAttempts should ===(1)
}

Non-blocking — the fix itself is correct. These tests would further harden the regression net.

else
(currentBucket, previousBucket) match {
case (bucket: TokenBucket, previous: TokenBucket)
if bucket.capacity == previous.capacity && bucket.tokensPerSecond == previous.tokensPerSecond =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor observation: bucket.tokensPerSecond == previous.tokensPerSecond uses exact Double equality. This is safe here because both values originate from the same SetThrottle configuration (same TokenBucket instance lineage), so no floating-point drift is possible between them. If a future change were to compute tokensPerSecond independently, consider switching to a tolerance-based comparison.

The CAS retry loop and the three-way dispatch logic are sound. Nice fix for a long-standing FIXME.

@fallintoplace fallintoplace force-pushed the fix/throttler-token-refund branch from 1b9b37a to 5c5a91f Compare June 19, 2026 20:07
@He-Pin He-Pin added this to the 2.0.0-M4 milestone Jun 19, 2026
@fallintoplace fallintoplace force-pushed the fix/throttler-token-refund branch from 5c5a91f to b259808 Compare June 19, 2026 20:09

@pjfanning pjfanning left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking until we have a CI run

@pjfanning pjfanning left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build passed

@pjfanning pjfanning dismissed their stale review June 19, 2026 21:20

unblocking

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants