From 6fd9ebc98ce3fa70187bad474e993ad4e9fd196f Mon Sep 17 00:00:00 2001 From: David Ho Date: Tue, 19 May 2026 16:53:26 -0700 Subject: [PATCH] Propagate error to active chunk subscriber in SplittingPublisher --- .../bugfix-AWSSDKforJavav2-773e2a6.json | 6 ++ .../NonRetryableSubAsyncRequestBody.java | 1 + .../async/RetryableSubAsyncRequestBody.java | 5 ++ .../internal/async/SplittingPublisher.java | 1 + .../internal/async/SubAsyncRequestBody.java | 5 ++ .../async/SplittingPublisherTest.java | 69 +++++++++++++++++++ 6 files changed, 87 insertions(+) create mode 100644 .changes/next-release/bugfix-AWSSDKforJavav2-773e2a6.json diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-773e2a6.json b/.changes/next-release/bugfix-AWSSDKforJavav2-773e2a6.json new file mode 100644 index 000000000000..9a9915c0a674 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-773e2a6.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Fixed an issue where `AsyncRequestBody.split()` did not propagate upstream errors to the in-progress chunk subscriber" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java index 221f9246a5e6..11af0474492d 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java @@ -97,6 +97,7 @@ public int partNumber() { return partNumber; } + @Override public void error(Throwable error) { delegate.error(error); } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java index 15f8b0199107..3951c4ca6d79 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java @@ -108,6 +108,11 @@ public long maxLength() { return configuration.maxLength(); } + @Override + public void error(Throwable error) { + delegate.error(error); + } + @Override public long receivedBytesLength() { return bufferedLength; diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index d4b58b0285e3..82e749c14cc4 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -250,6 +250,7 @@ public void onComplete() { @Override public void onError(Throwable t) { log.debug(() -> "Received onError()", t); + currentBody.error(t); downstreamPublisher.error(t); } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SubAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SubAsyncRequestBody.java index 6d3ec1b979a6..5f68b64a83c9 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SubAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SubAsyncRequestBody.java @@ -39,6 +39,11 @@ public interface SubAsyncRequestBody extends CloseableAsyncRequestBody { */ void complete(); + /** + * Signal that the stream has terminated with an error. No more {@link #send(ByteBuffer)} calls will be made. + */ + void error(Throwable error); + /** * The maximum length of the content this AsyncRequestBody can hold. If the upstream content length is known, this should be * the same as receivedBytesLength diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 87ac0b4726f4..39326ac4167b 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -229,6 +229,75 @@ void retryableSubAsyncRequestBodyEnabled_shouldBeAbleToResubscribe() throws Exec } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void upstreamError_shouldPropagateToCurrentBodySubscriber(boolean enableRetryableSubAsyncRequestBody) throws Exception { + RuntimeException upstreamError = new RuntimeException("upstream failure"); + AsyncRequestBody errorBody = new AsyncRequestBody() { + @Override + public Optional contentLength() { + return Optional.of(20L); + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new Subscription() { + private int calls = 0; + + @Override + public void request(long n) { + if (calls++ == 0) { + // Send partial data, then error + s.onNext(ByteBuffer.wrap(new byte[3])); + } else { + s.onError(upstreamError); + } + } + + @Override + public void cancel() { + } + }); + } + }; + + SplittingPublisher splittingPublisher = + SplittingPublisher.builder() + .asyncRequestBody(errorBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(enableRetryableSubAsyncRequestBody) + .build(); + + CompletableFuture bodyError = new CompletableFuture<>(); + splittingPublisher.subscribe(requestBody -> { + requestBody.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + } + + @Override + public void onError(Throwable t) { + bodyError.complete(t); + } + + @Override + public void onComplete() { + } + }); + }); + + Throwable error = bodyError.get(5, TimeUnit.SECONDS); + assertThat(error).isEqualTo(upstreamError); + } + private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception { SplittingPublisher splittingPublisher = SplittingPublisher.builder() .asyncRequestBody(asyncRequestBody)