stream: deprecate Source.queue overloads that materialize SourceQueueWithComplete#3095
stream: deprecate Source.queue overloads that materialize SourceQueueWithComplete#3095He-Pin wants to merge 5 commits into
Conversation
…QueueWithComplete Motivation: Source.queue(bufferSize, overflowStrategy) (both scaladsl and javadsl) materializes a SourceQueueWithComplete whose asynchronous offer future can hang indefinitely under OverflowStrategy.backpressure when the buffer is full and downstream stalls. The same class of issue has caused real-world deadlocks (akka/akka-core#29557) and prompted akka/akka.net#8248 to add a cancellation-aware offer overload. Pekko already has a safer drop-new primitive, BoundedSourceQueue, exposed via Source.queue(bufferSize), and backpressure is covered by Source.actorRefWithBackpressure and MergeHub.source. Upstream Akka has not converged on a new backpressure primitive in 4+ years (akka/akka-core#29801 open since 2021), so we should close the trap at the API surface now. Modification: Add @deprecated(since = "2.0.0") to the four Source.queue overloads that accept an OverflowStrategy (two scaladsl + two javadsl). The Source.queue[T](bufferSize) overloads that return BoundedSourceQueue are left as the recommended path. SourceQueue / SourceQueueWithComplete traits are not deprecated (they are still referenced by the deprecated factory return types and by user code), so only the factory entry points emit the warning. Result: Callers of Source.queue(Int, OverflowStrategy, ...) get a compile-time warning pointing them to Source.queue(bufferSize) for drop-new, or Source.actorRefWithBackpressure / MergeHub.source for backpressure. The hang trap is closed at the API level without breaking binary compatibility. Tests: Not run - API surface change only (annotation addition, no behavior change). Deprecation annotations are enforced by the Scala/Java compiler on every user compilation. References: Fixes apache#3094 Refs akka/akka-core#29801 (Replace old Source.queue - OPEN, stale since 2021) Refs akka/akka-core#29557 (Async callback tracking OOM in Source.queue) Refs akkadotnet/akka.net#8248 (Akka.NET cancellation-aware Source.Queue offer)
Motivation: The previous commit added @deprecated to the four Source.queue overloads that accept OverflowStrategy. Three internal call sites still invoke those deprecated overloads, which trips -Xfatal-warnings in Compile (scalac) and -Werror + -Xlint:deprecation (javac). Modification: - scaladsl.Source.queue(Int, OverflowStrategy): @nowarn("msg=deprecated") so its delegation to the 3-arg overload does not retrigger the warning. - javadsl.Source.queue(Int, OverflowStrategy) and the 3-arg overload: @nowarn("msg=deprecated") for the same reason on the scaladsl delegation. - InvokeWithFeedbackBenchmark: @nowarn on the class. The benchmark intentionally measures the deprecated backpressure path. - QueueSourceSpec: @nowarn on the class. The spec exercises every deprecated overload to lock in behavior until the APIs are removed. Result: Compile path passes -Xfatal-warnings and -Werror again. User code calling the deprecated factories still receives the warning. Tests: - sbt "stream / compile" — pass (213 Scala + 5 Java sources) - sbt "stream-tests / Test / compile" — pass (229 Scala + 32 Java) - sbt "bench-jmh / compile" — pass (98 Scala + 1 Java) References: Refs apache#3094
… deprecation path
Motivation:
The Source.queue overloads that accept an OverflowStrategy are now
deprecated since 2.0.0. The existing paradox page described both
SourceQueue and BoundedSourceQueue as first-class alternatives,
which no longer matches the recommended API surface.
The Scala #source-queue snippet was also silently calling the
deprecated path: BoundedSourceQueue.offer returns QueueOfferResult
synchronously, so queue.offer(x).map { case ... } cannot type-check
against a BoundedSourceQueue — the example in practice relied on
Source.queue(bufferSize, dropNew).
Modification:
- queue.md: lead with the non-deprecated BoundedSourceQueue
overload, add a @@@ warning { } callout flagging the deprecated
OverflowStrategy overloads, and add a migration table that maps
each OverflowStrategy to a recommended replacement
(BoundedSourceQueue, Source.actorRefWithBackpressure, or
MergeHub.source).
- IntegrationDocSpec.scala: rewrite the Scala #source-queue snippet
to match the Java counterpart — BoundedSourceQueue with a direct
match on QueueOfferResult, no implicit ExecutionContext.
- operators/index.md: drop "or SourceQueue" from the one-line summary
so the landing row reflects the non-deprecated API.
Result:
The docs page now points readers at the recommended API up-front,
the deprecated path is clearly flagged with a migration path, and
the Scala/Java examples are consistent (both BoundedSourceQueue).
Tests:
- sbt "docs / Test / compile" — pass (224 Scala + 234 Java sources;
the updated #source-queue snippet type-checks against
BoundedSourceQueue.offer returning QueueOfferResult)
References:
Refs apache#3094
…ee-also links
Motivation:
A reviewer pointed out that docs/src/main/paradox/stream/actor-interop.md
still describes the deprecated OverflowStrategy.backpressure path as the
recommended Source.queue usage, while the #source-queue snippet it
references was updated in the previous commit to use BoundedSourceQueue
with a synchronous match. The surrounding prose (lines 131-161) was
therefore inconsistent with the snippet — telling the reader "offer
returns a Future" while the snippet showed a direct match on
QueueOfferResult.
Separately, the "See also" entry for Source.queue in
operators/Source/actorRefWithBackpressure.md still referred to the
deprecated "SourceQueue" wording, even though the recommended path
now materializes a BoundedSourceQueue.
A second reviewer pass also flagged that the #source-queue-synchronous
Scala snippet still declared `implicit val ec = system.dispatcher` even
though the synchronous match on QueueOfferResult does not need an
ExecutionContext. This was misleading in an example that is explicitly
labeled "synchronous".
Modification:
- actor-interop.md: prepend a @@@ warning { } callout at the top of
the Source.queue subsection flagging the deprecated API and
explicitly noting that the referenced snippet has been updated to
the non-deprecated path. Existing prose (lines 131-173) is left
untouched so historical context is preserved.
- actorRefWithBackpressure.md: update the Source.queue "See also"
entry wording from "SourceQueue" to "BoundedSourceQueue".
- queue.md: keep the original prose and structure intact (both
BoundedSourceQueue and SourceQueue sections, both examples, all
existing wording). Add a @@@ warning { } callout after the
operator summary (post-index-extractor block) that points readers
to the recommended API and to the migration table. Add a smaller
warning directly above the deprecated Signature(SourceQueue)
block. Append the per-OverflowStrategy migration table at the
bottom.
- index.md: restored (no change vs HEAD~1) — the landing row still
mentions both BoundedSourceQueue and SourceQueue.
- IntegrationDocSpec.scala: drop `implicit val ec = system.dispatcher`
from the #source-queue-synchronous Scala snippet. The synchronous
match on QueueOfferResult does not require an ExecutionContext.
Result:
Readers landing on actor-interop.md or the Source.queue page see a
clear deprecation notice before the legacy prose. Operators index
entry is unchanged. The SourceQueue subsection of queue.md still
documents the deprecated path for users on older versions, with an
explicit warning above it. The synchronous Scala snippet no longer
declares an unused ExecutionContext. No historical prose was removed.
Tests:
- sbt "docs / Compile / managedResources" — pass (paradox
StreamOperatorsIndexGenerator extracts the summary/category link
successfully)
- sbt "docs / Test / compile" — pass (1 Scala source incremental
recompile after the ec removal; prior run had passed 224 Scala +
234 Java sources)
References:
Refs apache#3094
ebc5b14 to
44e3d67
Compare
|
doc build broken |
| @deprecated( | ||
| "Prefer Source.queue(bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " + | ||
| "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + | ||
| "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", |
There was a problem hiding this comment.
I'm not 100% sure but will go with the consensus.
Could we consider not deprecating and just being more persuasive in the docs?
I've already run into issues with the methods that we removed already in 2.0.0 and it being far from obvious what the replacement is. For me, if it's not broken, why remove it?
Having alternatives is great.
There was a problem hiding this comment.
it will cause OOM and FGC at $Work, the real bug, trust me ,I was using it and later migrate to the BoundedSourceQueue one by @jrudolph , the now it works very well, my system is the Taobao Live.
There was a problem hiding this comment.
Since these messages all end up in the mailbox, if the system is under heavy load, the backpressure mechanism fails; you then see heap memory steadily rising, followed by Full GCs, and eventually the system crashes. I think it’s better to just remove it—how should I put it? The feature works fine under low load, but it’s riddled with bugs under high load. akkadotnet/akka.net#8248 @Aaronontheweb FYI
There was a problem hiding this comment.
What I mean is, since this has indeed caused problems in my experience, we could deprecate it first and then remove it after a few years once hardly anyone is using it.
There was a problem hiding this comment.
there are other overflow stategies - should we consider doing something about OverflowStrategy .backpressure? We've already deprecated and removed OverflowStrategy.dropNew.
There was a problem hiding this comment.
It’s futile, because messages must first enter the mailbox before policies can actually be executed; if the actor never gets scheduled, none of those policies take effect. Consequently, an Out-of-Memory (OOM) error occurs as the mailbox grows indefinitely. In my view, this design is fundamentally flawed—it poses serious production safety risks under high load and, quite simply, is prone to causing production incidents.
…tion docs Motivation: Paradox @apidoc directives only resolve class/object names. Method references (Source.actorRefWithBackpressure, MergeHub.source$), arbitrary text (single imperative producer), and parenthetical text immediately following @apidoc[...] were all misparsed, causing the docs/paradox build to fail in CI. Modification: - Replace @apidoc[Source.actorRefWithBackpressure] with @ref links to the existing operator page. - Replace @apidoc[MergeHub.source$] with plain code (no operator page). - Replace @apidoc[SourceQueueWithComplete] and @apidoc[BoundedSourceQueue.fail] with plain code (class in context / method reference). - Restructure queue.md:9 to remove parentheses immediately after @apidoc[OverflowStrategy] that Paradox misread as anchor syntax. Result: sbt docs/paradox passes. CI paradoxMarkdownToHtml step succeeds. Tests: - sbt docs/paradox — pass References: Fixes CI failure in PR apache#3095
Motivation
The
Source.queue(bufferSize, overflowStrategy)overloads (both Scala and Java DSLs) materialize aSourceQueueWithCompletewhose asynchronousofferfuture can hang indefinitely underOverflowStrategy.backpressurewhen the buffer is full and downstream stalls. This is a long-standing trap that has caused real-world deadlocks (the same class of issue was documented upstream as akka/akka-core#29557, and more recently prompted akka/akka.net#8248 to add a cancellation-aware offer overload).Pekko already provides a safer replacement:
Source.queue[T](bufferSize)materializes aBoundedSourceQueuewith synchronous feedback (drops the newest element when the buffer is full, never hangs).Source.actorRefWithBackpressureandMergeHub.sourcecover the common use cases.The upstream Akka discussion in akka/akka-core#29801 ("Replace old Source.queue") has been open since 2021 without converging on a new backpressure primitive. We should not wait: the drop-new path (
BoundedSourceQueue) is already stable and production-proven, and the backpressure path is covered by existing operators.Modification
API deprecation:
@deprecated(since = "2.0.0")on the fourSource.queueoverloads that accept anOverflowStrategy(two scaladsl + two javadsl).Source.queue[T](bufferSize)overloads that returnBoundedSourceQueueare not deprecated.SourceQueue/SourceQueueWithCompletetraits are not deprecated (they are still referenced by the deprecated factory return types and by user code).Internal suppression:
@nowarn("msg=deprecated")on the deprecated factories (so their delegation chain does not retrigger the warning under-Xfatal-warnings/-Werror) and on the two Pekko-internal call sites that intentionally exercise the deprecated API:InvokeWithFeedbackBenchmarkandQueueSourceSpec.Docs: preserve the existing
Source/queue.mdprose (both theBoundedSourceQueueand theSourceQueuesections, both examples, all existing wording). Add:@@@ warningcallout immediately after the operator summary pointing to the recommended API and the migration table.Signature (SourceQueue)block.OverflowStrategymigration table appended at the bottom.actor-interop.md'sSource.queuesubsection explaining that the referenced snippet has been updated.Source/actorRefWithBackpressure.md("BoundedSourceQueue" instead of "SourceQueue").#source-queuesnippet inIntegrationDocSpec.scalawas fixed to use a directmatchonQueueOfferResult. The previous snippet usedqueue.offer(x).map { case ... }, butBoundedSourceQueue.offerreturnsQueueOfferResultsynchronously —QueueOfferResulthas no.mapmethod, so the snippet did not type-check against the API it was nominally demonstrating. The sibling#source-queue-synchronousScala snippet had a misleadingimplicit val ec = system.dispatcherdeclaration that served no purpose (the synchronousmatchdoes not need anExecutionContext); it was removed. Java#source-queuesnippet was already consistent and is unchanged.Result
Source.queue(...)with anOverflowStrategyget a compile-time deprecation warning pointing them toSource.queue(bufferSize)for drop-new, orSource.actorRefWithBackpressure/MergeHub.sourcefor backpressure.-Xfatal-warnings.#source-queuesnippets type-check againstBoundedSourceQueue.offerreturningQueueOfferResultsynchronously, match the Java counterpart semantically, and no longer declare unused imports.Tests
sbt "stream / compile"— pass (213 Scala + 5 Java sources)sbt "stream-tests / Test / compile"— pass (229 Scala + 32 Java sources)sbt "bench-jmh / compile"— pass (98 Scala + 1 Java sources)sbt "docs / Compile / managedResources"— pass (paradoxStreamOperatorsIndexGeneratorextracts the summary/category link successfully)sbt "docs / Test / compile"— pass (224 Scala + 234 Java sources; updated#source-queuesnippet type-checks againstBoundedSourceQueue.offerreturningQueueOfferResult; subsequent incremental recompile after the unused-ecremoval also passed)References