Skip to content

stream: deprecate Source.queue overloads that materialize SourceQueueWithComplete#3095

Open
He-Pin wants to merge 5 commits into
apache:mainfrom
He-Pin:deprecate-source-queue-overloads
Open

stream: deprecate Source.queue overloads that materialize SourceQueueWithComplete#3095
He-Pin wants to merge 5 commits into
apache:mainfrom
He-Pin:deprecate-source-queue-overloads

Conversation

@He-Pin

@He-Pin He-Pin commented Jun 19, 2026

Copy link
Copy Markdown
Member

Motivation

The Source.queue(bufferSize, overflowStrategy) overloads (both Scala and Java DSLs) materialize a SourceQueueWithComplete whose asynchronous offer future can hang indefinitely under OverflowStrategy.backpressure when the buffer is full and downstream stalls. This is a long-standing trap that has caused real-world deadlocks (the same class of issue was documented upstream as akka/akka-core#29557, and more recently prompted akka/akka.net#8248 to add a cancellation-aware offer overload).

Pekko already provides a safer replacement:

  • Source.queue[T](bufferSize) materializes a BoundedSourceQueue with synchronous feedback (drops the newest element when the buffer is full, never hangs).
  • For backpressure scenarios, Source.actorRefWithBackpressure and MergeHub.source cover the common use cases.

The upstream Akka discussion in akka/akka-core#29801 ("Replace old Source.queue") has been open since 2021 without converging on a new backpressure primitive. We should not wait: the drop-new path (BoundedSourceQueue) is already stable and production-proven, and the backpressure path is covered by existing operators.

Modification

  1. API deprecation: @deprecated(since = "2.0.0") on the four Source.queue overloads that accept an OverflowStrategy (two scaladsl + two javadsl). Source.queue[T](bufferSize) overloads that return BoundedSourceQueue are not deprecated. SourceQueue / SourceQueueWithComplete traits are not deprecated (they are still referenced by the deprecated factory return types and by user code).

  2. Internal suppression: @nowarn("msg=deprecated") on the deprecated factories (so their delegation chain does not retrigger the warning under -Xfatal-warnings / -Werror) and on the two Pekko-internal call sites that intentionally exercise the deprecated API: InvokeWithFeedbackBenchmark and QueueSourceSpec.

  3. Docs: preserve the existing Source/queue.md prose (both the BoundedSourceQueue and the SourceQueue sections, both examples, all existing wording). Add:

    • A @@@ warning callout immediately after the operator summary pointing to the recommended API and the migration table.
    • A smaller warning directly above the deprecated Signature (SourceQueue) block.
    • A per-OverflowStrategy migration table appended at the bottom.
    • A deprecation notice in actor-interop.md's Source.queue subsection explaining that the referenced snippet has been updated.
    • Updated "See also" wording in Source/actorRefWithBackpressure.md ("BoundedSourceQueue" instead of "SourceQueue").
    • The Scala #source-queue snippet in IntegrationDocSpec.scala was fixed to use a direct match on QueueOfferResult. The previous snippet used queue.offer(x).map { case ... }, but BoundedSourceQueue.offer returns QueueOfferResult synchronously — QueueOfferResult has no .map method, so the snippet did not type-check against the API it was nominally demonstrating. The sibling #source-queue-synchronous Scala snippet had a misleading implicit val ec = system.dispatcher declaration that served no purpose (the synchronous match does not need an ExecutionContext); it was removed. Java #source-queue snippet was already consistent and is unchanged.

Result

  • Users calling Source.queue(...) with an OverflowStrategy get a compile-time deprecation warning pointing them to Source.queue(bufferSize) for drop-new, or Source.actorRefWithBackpressure / MergeHub.source for backpressure.
  • The hang trap is closed at the API level without breaking binary compatibility (the deprecated methods are unchanged, only annotated).
  • Pekko internal code is free of deprecation warnings under -Xfatal-warnings.
  • Docs surface the deprecation notice on the operator page, the actor-interop page, and the see-also links, while preserving all historical prose for users on older versions.
  • Both Scala #source-queue snippets type-check against BoundedSourceQueue.offer returning QueueOfferResult synchronously, match the Java counterpart semantically, and no longer declare unused imports.

Tests

  • sbt "stream / compile" — pass (213 Scala + 5 Java sources)
  • sbt "stream-tests / Test / compile" — pass (229 Scala + 32 Java sources)
  • sbt "bench-jmh / compile" — pass (98 Scala + 1 Java sources)
  • sbt "docs / Compile / managedResources" — pass (paradox StreamOperatorsIndexGenerator extracts the summary/category link successfully)
  • sbt "docs / Test / compile" — pass (224 Scala + 234 Java sources; updated #source-queue snippet type-checks against BoundedSourceQueue.offer returning QueueOfferResult; subsequent incremental recompile after the unused-ec removal also passed)

References

He-Pin added 3 commits June 20, 2026 01:39
…QueueWithComplete

Motivation:
Source.queue(bufferSize, overflowStrategy) (both scaladsl and javadsl)
materializes a SourceQueueWithComplete whose asynchronous offer future
can hang indefinitely under OverflowStrategy.backpressure when the
buffer is full and downstream stalls. The same class of issue has
caused real-world deadlocks (akka/akka-core#29557) and prompted
akka/akka.net#8248 to add a cancellation-aware offer overload.

Pekko already has a safer drop-new primitive, BoundedSourceQueue,
exposed via Source.queue(bufferSize), and backpressure is covered by
Source.actorRefWithBackpressure and MergeHub.source. Upstream Akka
has not converged on a new backpressure primitive in 4+ years
(akka/akka-core#29801 open since 2021), so we should close the trap
at the API surface now.

Modification:
Add @deprecated(since = "2.0.0") to the four Source.queue overloads
that accept an OverflowStrategy (two scaladsl + two javadsl). The
Source.queue[T](bufferSize) overloads that return BoundedSourceQueue
are left as the recommended path. SourceQueue / SourceQueueWithComplete
traits are not deprecated (they are still referenced by the deprecated
factory return types and by user code), so only the factory entry
points emit the warning.

Result:
Callers of Source.queue(Int, OverflowStrategy, ...) get a
compile-time warning pointing them to Source.queue(bufferSize) for
drop-new, or Source.actorRefWithBackpressure / MergeHub.source for
backpressure. The hang trap is closed at the API level without
breaking binary compatibility.

Tests:
Not run - API surface change only (annotation addition, no behavior
change). Deprecation annotations are enforced by the Scala/Java
compiler on every user compilation.

References:
Fixes apache#3094
Refs akka/akka-core#29801 (Replace old Source.queue - OPEN, stale since 2021)
Refs akka/akka-core#29557 (Async callback tracking OOM in Source.queue)
Refs akkadotnet/akka.net#8248 (Akka.NET cancellation-aware Source.Queue offer)
Motivation:
The previous commit added @deprecated to the four Source.queue
overloads that accept OverflowStrategy. Three internal call sites
still invoke those deprecated overloads, which trips -Xfatal-warnings
in Compile (scalac) and -Werror + -Xlint:deprecation (javac).

Modification:
- scaladsl.Source.queue(Int, OverflowStrategy): @nowarn("msg=deprecated")
  so its delegation to the 3-arg overload does not retrigger the
  warning.
- javadsl.Source.queue(Int, OverflowStrategy) and the 3-arg overload:
  @nowarn("msg=deprecated") for the same reason on the scaladsl
  delegation.
- InvokeWithFeedbackBenchmark: @nowarn on the class. The benchmark
  intentionally measures the deprecated backpressure path.
- QueueSourceSpec: @nowarn on the class. The spec exercises every
  deprecated overload to lock in behavior until the APIs are removed.

Result:
Compile path passes -Xfatal-warnings and -Werror again. User code
calling the deprecated factories still receives the warning.

Tests:
- sbt "stream / compile" — pass (213 Scala + 5 Java sources)
- sbt "stream-tests / Test / compile" — pass (229 Scala + 32 Java)
- sbt "bench-jmh / compile" — pass (98 Scala + 1 Java)

References:
Refs apache#3094
… deprecation path

Motivation:
The Source.queue overloads that accept an OverflowStrategy are now
deprecated since 2.0.0. The existing paradox page described both
SourceQueue and BoundedSourceQueue as first-class alternatives,
which no longer matches the recommended API surface.

The Scala #source-queue snippet was also silently calling the
deprecated path: BoundedSourceQueue.offer returns QueueOfferResult
synchronously, so queue.offer(x).map { case ... } cannot type-check
against a BoundedSourceQueue — the example in practice relied on
Source.queue(bufferSize, dropNew).

Modification:
- queue.md: lead with the non-deprecated BoundedSourceQueue
  overload, add a @@@ warning { } callout flagging the deprecated
  OverflowStrategy overloads, and add a migration table that maps
  each OverflowStrategy to a recommended replacement
  (BoundedSourceQueue, Source.actorRefWithBackpressure, or
  MergeHub.source).
- IntegrationDocSpec.scala: rewrite the Scala #source-queue snippet
  to match the Java counterpart — BoundedSourceQueue with a direct
  match on QueueOfferResult, no implicit ExecutionContext.
- operators/index.md: drop "or SourceQueue" from the one-line summary
  so the landing row reflects the non-deprecated API.

Result:
The docs page now points readers at the recommended API up-front,
the deprecated path is clearly flagged with a migration path, and
the Scala/Java examples are consistent (both BoundedSourceQueue).

Tests:
- sbt "docs / Test / compile" — pass (224 Scala + 234 Java sources;
  the updated #source-queue snippet type-checks against
  BoundedSourceQueue.offer returning QueueOfferResult)

References:
Refs apache#3094
@He-Pin He-Pin requested a review from pjfanning June 19, 2026 17:58
@He-Pin He-Pin added the t:stream Pekko Streams label Jun 19, 2026
@He-Pin He-Pin requested a review from jrudolph June 19, 2026 17:58
@He-Pin He-Pin added this to the 2.0.0-M4 milestone Jun 19, 2026
…ee-also links

Motivation:
A reviewer pointed out that docs/src/main/paradox/stream/actor-interop.md
still describes the deprecated OverflowStrategy.backpressure path as the
recommended Source.queue usage, while the #source-queue snippet it
references was updated in the previous commit to use BoundedSourceQueue
with a synchronous match. The surrounding prose (lines 131-161) was
therefore inconsistent with the snippet — telling the reader "offer
returns a Future" while the snippet showed a direct match on
QueueOfferResult.

Separately, the "See also" entry for Source.queue in
operators/Source/actorRefWithBackpressure.md still referred to the
deprecated "SourceQueue" wording, even though the recommended path
now materializes a BoundedSourceQueue.

A second reviewer pass also flagged that the #source-queue-synchronous
Scala snippet still declared `implicit val ec = system.dispatcher` even
though the synchronous match on QueueOfferResult does not need an
ExecutionContext. This was misleading in an example that is explicitly
labeled "synchronous".

Modification:
- actor-interop.md: prepend a @@@ warning { } callout at the top of
  the Source.queue subsection flagging the deprecated API and
  explicitly noting that the referenced snippet has been updated to
  the non-deprecated path. Existing prose (lines 131-173) is left
  untouched so historical context is preserved.
- actorRefWithBackpressure.md: update the Source.queue "See also"
  entry wording from "SourceQueue" to "BoundedSourceQueue".
- queue.md: keep the original prose and structure intact (both
  BoundedSourceQueue and SourceQueue sections, both examples, all
  existing wording). Add a @@@ warning { } callout after the
  operator summary (post-index-extractor block) that points readers
  to the recommended API and to the migration table. Add a smaller
  warning directly above the deprecated Signature(SourceQueue)
  block. Append the per-OverflowStrategy migration table at the
  bottom.
- index.md: restored (no change vs HEAD~1) — the landing row still
  mentions both BoundedSourceQueue and SourceQueue.
- IntegrationDocSpec.scala: drop `implicit val ec = system.dispatcher`
  from the #source-queue-synchronous Scala snippet. The synchronous
  match on QueueOfferResult does not require an ExecutionContext.

Result:
Readers landing on actor-interop.md or the Source.queue page see a
clear deprecation notice before the legacy prose. Operators index
entry is unchanged. The SourceQueue subsection of queue.md still
documents the deprecated path for users on older versions, with an
explicit warning above it. The synchronous Scala snippet no longer
declares an unused ExecutionContext. No historical prose was removed.

Tests:
- sbt "docs / Compile / managedResources" — pass (paradox
  StreamOperatorsIndexGenerator extracts the summary/category link
  successfully)
- sbt "docs / Test / compile" — pass (1 Scala source incremental
  recompile after the ec removal; prior run had passed 224 Scala +
  234 Java sources)

References:
Refs apache#3094
@He-Pin He-Pin force-pushed the deprecate-source-queue-overloads branch from ebc5b14 to 44e3d67 Compare June 19, 2026 18:24
@pjfanning

Copy link
Copy Markdown
Member

doc build broken

[06-19 18:58:19.145] [error] Class not found for @apidoc[Source.actorRefWithBackpressure] (pattern Source\$actorRefWithBackpressure$) at /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/actor-interop.md:133
[06-19 18:58:19.145] [error] The `Source.queue(Int, OverflowStrategy)` overloads that this section describes — including `OverflowStrategy.backpressure` — are **deprecated** because their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely when downstream stalls. Use `Source.queue[T](bufferSize)` (materializes a @apidoc[BoundedSourceQueue] with synchronous feedback), or for backpressure towards the producer use @apidoc[Source.actorRefWithBackpressure] or @apidoc[MergeHub.source$]. See @ref:[the Source.queue operator page](operators/Source/queue.md) for a per-strategy migration table.
[06-19 18:58:19.146] [error]                                                                                                                                                                                                                                                                                                                                                                                                                                   ^
[06-19 18:58:19.146] [error] Class not found for @apidoc[MergeHub.source$] (pattern MergeHub\$source$) at /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/actor-interop.md:133
[06-19 18:58:19.146] [error] The `Source.queue(Int, OverflowStrategy)` overloads that this section describes — including `OverflowStrategy.backpressure` — are **deprecated** because their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely when downstream stalls. Use `Source.queue[T](bufferSize)` (materializes a @apidoc[BoundedSourceQueue] with synchronous feedback), or for backpressure towards the producer use @apidoc[Source.actorRefWithBackpressure] or @apidoc[MergeHub.source$]. See @ref:[the Source.queue operator page](operators/Source/queue.md) for a per-strategy migration table.
[06-19 18:58:19.146] [error]                                                                                                                                                                                                                                                                                                                                                                                                                                                                               ^
[06-19 18:58:19.146] [error] No matches found for apidoc query [and materialize a @apidoc[SourceQueueWithComplete]] at /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source/queue.md:9
[06-19 18:58:19.147] [error] The `Source.queue` overloads that accept an @apidoc[OverflowStrategy] (and materialize a @apidoc[SourceQueueWithComplete]) are **deprecated**. Their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely under `OverflowStrategy.backpressure` when downstream stalls, which has caused real-world deadlocks.
[06-19 18:58:19.147] [error]                                             ^
[06-19 18:58:19.147] [error] No matches found for apidoc query [single imperative producer] at /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source/queue.md:11
[06-19 18:58:19.147] [error] Prefer `Source.queue[T](bufferSize)` (this page), which materializes a @apidoc[BoundedSourceQueue] with synchronous feedback and drop-newest overflow. For backpressure towards the producer, use @apidoc[Source.actorRefWithBackpressure] (single imperative producer) or @apidoc[MergeHub.source$] (multiple producers). See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below for a per-strategy replacement.
[06-19 18:58:19.147] [error]                                                                                                                                                                                                   ^
[06-19 18:58:19.147] [error] No matches found for apidoc query [multiple producers] at /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source/queue.md:11
[06-19 18:58:19.147] [error] Prefer `Source.queue[T](bufferSize)` (this page), which materializes a @apidoc[BoundedSourceQueue] with synchronous feedback and drop-newest overflow. For backpressure towards the producer, use @apidoc[Source.actorRefWithBackpressure] (single imperative producer) or @apidoc[MergeHub.source$] (multiple producers). See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below for a per-strategy replacement.
[06-19 18:58:19.147] [error]                                                                                                                                                                                                                                                                            ^
[06-19 18:58:19.147] [error] Class not found for @apidoc[BoundedSourceQueue.fail] (pattern BoundedSourceQueue\$fail$) at /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source/queue.md:94
[06-19 18:58:19.147] [error] | `Source.queue(n, OverflowStrategy.fail)` | `Source.queue[T](n)` and, on `QueueOfferResult.Dropped`, call @apidoc[BoundedSourceQueue.fail] with a `BufferOverflowException`. |
[06-19 18:58:19.147] [error]                                                                                                            ^
[06-19 18:58:19.147] [error] No matches found for apidoc query [single imperative producer] at /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source/queue.md:95
[06-19 18:58:19.147] [error] | `Source.queue(n, OverflowStrategy.backpressure)` | @apidoc[Source.actorRefWithBackpressure] (single imperative producer) or @apidoc[MergeHub.source$] (multiple producers). |
[06-19 18:58:19.147] [error]                                                      ^
[06-19 18:58:19.147] [error] No matches found for apidoc query [multiple producers] at /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source/queue.md:95
[06-19 18:58:19.147] [error] | `Source.queue(n, OverflowStrategy.backpressure)` | @apidoc[Source.actorRefWithBackpressure] (single imperative producer) or @apidoc[MergeHub.source$] (multiple producers). |
[06-19 18:58:19.147] [error]                                                                                                                               ^
[06-19 18:58:19.147] [error] Paradox failed with 8 errors

@deprecated(
"Prefer Source.queue(bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " +
"(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " +
"or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure but will go with the consensus.

Could we consider not deprecating and just being more persuasive in the docs?

I've already run into issues with the methods that we removed already in 2.0.0 and it being far from obvious what the replacement is. For me, if it's not broken, why remove it?
Having alternatives is great.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will cause OOM and FGC at $Work, the real bug, trust me ,I was using it and later migrate to the BoundedSourceQueue one by @jrudolph , the now it works very well, my system is the Taobao Live.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these messages all end up in the mailbox, if the system is under heavy load, the backpressure mechanism fails; you then see heap memory steadily rising, followed by Full GCs, and eventually the system crashes. I think it’s better to just remove it—how should I put it? The feature works fine under low load, but it’s riddled with bugs under high load. akkadotnet/akka.net#8248 @Aaronontheweb FYI

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is, since this has indeed caused problems in my experience, we could deprecate it first and then remove it after a few years once hardly anyone is using it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are other overflow stategies - should we consider doing something about OverflowStrategy .backpressure? We've already deprecated and removed OverflowStrategy.dropNew.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s futile, because messages must first enter the mailbox before policies can actually be executed; if the actor never gets scheduled, none of those policies take effect. Consequently, an Out-of-Memory (OOM) error occurs as the mailbox grows indefinitely. In my view, this design is fundamentally flawed—it poses serious production safety risks under high load and, quite simply, is prone to causing production incidents.

…tion docs

Motivation:
Paradox @apidoc directives only resolve class/object names. Method
references (Source.actorRefWithBackpressure, MergeHub.source$),
arbitrary text (single imperative producer), and parenthetical text
immediately following @apidoc[...] were all misparsed, causing the
docs/paradox build to fail in CI.

Modification:
- Replace @apidoc[Source.actorRefWithBackpressure] with @ref links to
  the existing operator page.
- Replace @apidoc[MergeHub.source$] with plain code (no operator page).
- Replace @apidoc[SourceQueueWithComplete] and @apidoc[BoundedSourceQueue.fail]
  with plain code (class in context / method reference).
- Restructure queue.md:9 to remove parentheses immediately after
  @apidoc[OverflowStrategy] that Paradox misread as anchor syntax.

Result:
sbt docs/paradox passes. CI paradoxMarkdownToHtml step succeeds.

Tests:
- sbt docs/paradox — pass

References:
Fixes CI failure in PR apache#3095
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

t:stream Pekko Streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Deprecate Source.queue overloads that materialize SourceQueueWithComplete

2 participants