From 051f869bbf5f604a7fac5772fd848b57069f03bd Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Mon, 11 May 2026 16:04:23 -0700 Subject: [PATCH 1/2] Switch from the CRT pull-based HttpRequestBodyStream model to the push-based HttpStreamBase.writeData() API to avoid potential deadlock issue when request body InputStream blocks. --- .../bugfix-AWSCRTHTTPClient-aee08c2.json | 6 ++ .../awssdk/http/crt/AwsCrtHttpClient.java | 38 ++++++++- .../http/crt/internal/CrtRequestExecutor.java | 52 ++++++++++--- .../internal/request/CrtRequestAdapter.java | 9 +-- .../request/CrtRequestInputStreamAdapter.java | 78 ------------------- .../crt/internal/CrtRequestExecutorTest.java | 18 ++--- pom.xml | 2 +- 7 files changed, 94 insertions(+), 109 deletions(-) create mode 100644 .changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json delete mode 100644 http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java diff --git a/.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json b/.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json new file mode 100644 index 000000000000..f0cc6298ec3c --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT HTTP Client", + "contributor": "", + "description": "Fixed a potential deadlock in `AwsCrtHttpClient` that could occur when the request body `InputStream` blocked waiting for data on the CRT event loop thread. This could happen when a blocking stream (e.g., a `BufferedInputStream` wrapping a `ResponseInputStream`) was used as a request body and the read depended on the same event loop thread to deliver data. Request body writing now happens on the caller thread." +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java index 9c6d769e48fa..11ef6efcc655 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java @@ -18,13 +18,17 @@ import static software.amazon.awssdk.http.HttpMetric.HTTP_CLIENT_NAME; import java.io.IOException; +import java.io.InputStream; import java.time.Duration; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.crt.http.HttpException; +import software.amazon.awssdk.crt.http.HttpStreamBase; import software.amazon.awssdk.crt.http.HttpStreamManager; +import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.ExecutableHttpRequest; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.HttpExecuteResponse; @@ -35,6 +39,7 @@ import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase; import software.amazon.awssdk.http.crt.internal.CrtRequestContext; import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor; +import software.amazon.awssdk.http.crt.internal.CrtUtils; import software.amazon.awssdk.utils.AttributeMap; import software.amazon.awssdk.utils.CompletableFutureUtils; @@ -107,6 +112,8 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) { } private static final class CrtHttpRequest implements ExecutableHttpRequest { + private static final int WRITE_BUFFER_SIZE = 16 * 1024; + private final CrtRequestContext context; private volatile CompletableFuture responseFuture; @@ -119,7 +126,14 @@ public HttpExecuteResponse call() throws IOException { HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder(); try { - responseFuture = new CrtRequestExecutor().execute(context); + CrtRequestExecutor.ExecutionResult execution = new CrtRequestExecutor().execute(context); + responseFuture = execution.responseFuture(); + + // Wait for the stream to be acquired, then write the request body from the caller thread. + // This avoids blocking the CRT event loop thread in InputStream.read(). + HttpStreamBase stream = CompletableFutureUtils.joinInterruptibly(execution.streamFuture()); + writeRequestBody(stream); + SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture); builder.response(response); builder.responseBody(response.content().orElse(null)); @@ -140,6 +154,10 @@ public HttpExecuteResponse call() throws IOException { } if (cause instanceof HttpException) { + Throwable wrapped = CrtUtils.wrapCrtException(cause); + if (wrapped instanceof IOException) { + throw (IOException) wrapped; + } throw (HttpException) cause; } @@ -151,6 +169,24 @@ public HttpExecuteResponse call() throws IOException { } } + private void writeRequestBody(HttpStreamBase stream) throws IOException { + ContentStreamProvider provider = context.sdkRequest().contentStreamProvider().orElse(null); + if (provider == null) { + return; + } + + try (InputStream inputStream = provider.newStream()) { + byte[] buf = new byte[WRITE_BUFFER_SIZE]; + int read; + + while ((read = inputStream.read(buf, 0, buf.length)) >= 0) { + byte[] chunk = read == buf.length ? buf : Arrays.copyOf(buf, read); + CompletableFutureUtils.joinInterruptibly(stream.writeData(chunk, false)); + } + CompletableFutureUtils.joinInterruptibly(stream.writeData(null, true)); + } + } + @Override public void abort() { if (responseFuture != null) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java index 7165696435e1..7c764cf127df 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java @@ -32,37 +32,40 @@ @SdkInternalApi public final class CrtRequestExecutor { - public CompletableFuture execute(CrtRequestContext executionContext) { - CompletableFuture requestFuture = new CompletableFuture<>(); + public ExecutionResult execute(CrtRequestContext executionContext) { + CompletableFuture responseFuture = new CompletableFuture<>(); + CompletableFuture streamFuture; try { - doExecute(executionContext, requestFuture); + streamFuture = doExecute(executionContext, responseFuture); } catch (Throwable t) { - requestFuture.completeExceptionally(t); + responseFuture.completeExceptionally(t); + streamFuture = new CompletableFuture<>(); + streamFuture.completeExceptionally(t); } - return requestFuture; + return new ExecutionResult(streamFuture, responseFuture); } - private void doExecute(CrtRequestContext executionContext, CompletableFuture requestFuture) { + private CompletableFuture doExecute(CrtRequestContext executionContext, + CompletableFuture responseFuture) { MetricCollector metricCollector = executionContext.metricCollector(); boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); long acquireStartTime = 0; if (shouldPublishMetrics) { - // go ahead and get acquireStartTime for the concurrency timer as early as possible, - // so it's as accurate as possible, but only do it in a branch since clock_gettime() - // results in a full sys call barrier (multiple mutexes and a hw interrupt). acquireStartTime = System.nanoTime(); } - HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); + HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(responseFuture); HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext); + boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent(); + CompletableFuture streamFuture = - executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); + executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody); long finalAcquireStartTime = acquireStartTime; @@ -73,8 +76,33 @@ private void doExecute(CrtRequestContext executionContext, CompletableFuture streamFuture; + private final CompletableFuture responseFuture; + + ExecutionResult(CompletableFuture streamFuture, + CompletableFuture responseFuture) { + this.streamFuture = streamFuture; + this.responseFuture = responseFuture; + } + + public CompletableFuture streamFuture() { + return streamFuture; + } + + public CompletableFuture responseFuture() { + return responseFuture; + } } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java index 8672d80b0d1b..e97b1adb514e 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java @@ -78,14 +78,7 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) { HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest)); String finalEncodedPath = encodedPath + encodedQueryString; - return sdkExecuteRequest.contentStreamProvider() - .map(provider -> new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, - new CrtRequestInputStreamAdapter(provider))) - .orElse(new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, null)); + return new HttpRequest(method, finalEncodedPath, crtHeaderArray, null); } private static HttpHeader[] asArray(List crtHeaderList) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java deleted file mode 100644 index 68f418b9e1df..000000000000 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.http.crt.internal.request; - -import static java.lang.Math.min; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpRequestBodyStream; -import software.amazon.awssdk.http.ContentStreamProvider; - -@SdkInternalApi -final class CrtRequestInputStreamAdapter implements HttpRequestBodyStream { - private static final int READ_BUFFER_SIZE = 16 * 1024; - - private final ContentStreamProvider provider; - private volatile InputStream providerStream; - private final byte[] readBuffer = new byte[READ_BUFFER_SIZE]; - - CrtRequestInputStreamAdapter(ContentStreamProvider provider) { - this.provider = provider; - } - - @Override - public boolean sendRequestBody(ByteBuffer bodyBytesOut) { - int read; - - try { - if (providerStream == null) { - createNewStream(); - } - - int toRead = min(READ_BUFFER_SIZE, bodyBytesOut.remaining()); - read = providerStream.read(readBuffer, 0, toRead); - - if (read > 0) { - bodyBytesOut.put(readBuffer, 0, read); - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return read < 0; - } - - @Override - public boolean resetPosition() { - try { - createNewStream(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return true; - } - - private void createNewStream() throws IOException { - if (providerStream != null) { - providerStream.close(); - } - providerStream = provider.newStream(); - } -} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java index 456000ac1150..87a2849edc7b 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java @@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() { .request(HttpExecuteRequest.builder().build()) .build(); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); } @@ -98,11 +98,11 @@ public void execute_acquireStreamFails_wrapsWithIOException() { CrtRequestContext context = crtRequestContext(); CompletableFuture completableFuture = new CompletableFuture<>(); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); completableFuture.completeExceptionally(exception); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); } @@ -113,10 +113,10 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable) CrtRequestContext context = crtRequestContext(); CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(throwable); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class); } @@ -130,10 +130,10 @@ public void execute_httpException_mapsToCorrectException(Entry completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass); } @@ -143,10 +143,10 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() { CrtRequestContext context = crtRequestContext(); CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThatThrownBy(executeFuture::join).hasCause(exception); } diff --git a/pom.xml b/pom.xml index 1b6bee34df5e..1848cf1dd892 100644 --- a/pom.xml +++ b/pom.xml @@ -131,7 +131,7 @@ 3.1.5 1.17.1 1.37 - 0.45.1 + 1.0.0-SNAPSHOT 5.10.3 From 2d49203b505d4761b8459e4ed8f97c407d16dec8 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Wed, 13 May 2026 13:02:32 -0700 Subject: [PATCH 2/2] Extract CrtStreamHandler to manage stream lifecycle Move stream lifecycle operations (writeData, incrementWindow, releaseConnection, closeConnection) from ResponseHandlerHelper into a dedicated CrtStreamHandler class. This separates header parsing from stream management and makes the shared stream guard explicit between the request executor and response handler. ResponseHandlerHelper now only handles response header parsing. --- .../awssdk/http/crt/AwsCrtHttpClient.java | 20 +-- .../crt/internal/CrtAsyncRequestExecutor.java | 6 +- .../http/crt/internal/CrtRequestExecutor.java | 46 ++----- .../http/crt/internal/CrtStreamHandler.java | 117 +++++++++++++++++ .../internal/response/CrtResponseAdapter.java | 30 +++-- ...reamAdaptingHttpStreamResponseHandler.java | 24 ++-- .../response/ResponseHandlerHelper.java | 44 ------- .../BaseHttpStreamResponseHandlerTest.java | 44 ++++--- .../crt/internal/CrtRequestExecutorTest.java | 10 +- .../crt/internal/CrtResponseHandlerTest.java | 12 +- .../crt/internal/CrtStreamHandlerTest.java | 90 +++++++++++++ ...AdaptingHttpStreamResponseHandlerTest.java | 22 ++-- .../internal/ResponseHandlerHelperTest.java | 124 ------------------ 13 files changed, 320 insertions(+), 269 deletions(-) create mode 100644 http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java create mode 100644 http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java delete mode 100644 http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java index 11ef6efcc655..d81c286fc5f4 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java @@ -26,7 +26,6 @@ import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.crt.http.HttpException; -import software.amazon.awssdk.crt.http.HttpStreamBase; import software.amazon.awssdk.crt.http.HttpStreamManager; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.ExecutableHttpRequest; @@ -39,6 +38,7 @@ import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase; import software.amazon.awssdk.http.crt.internal.CrtRequestContext; import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.http.crt.internal.CrtUtils; import software.amazon.awssdk.utils.AttributeMap; import software.amazon.awssdk.utils.CompletableFutureUtils; @@ -126,13 +126,13 @@ public HttpExecuteResponse call() throws IOException { HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder(); try { - CrtRequestExecutor.ExecutionResult execution = new CrtRequestExecutor().execute(context); - responseFuture = execution.responseFuture(); + CrtStreamHandler streamHandler = new CrtStreamHandler(); + responseFuture = new CrtRequestExecutor().execute(context, streamHandler); - // Wait for the stream to be acquired, then write the request body from the caller thread. + // Write the request body from the caller thread via the stream handler, + // which guards against concurrent stream close with a synchronized block. // This avoids blocking the CRT event loop thread in InputStream.read(). - HttpStreamBase stream = CompletableFutureUtils.joinInterruptibly(execution.streamFuture()); - writeRequestBody(stream); + writeRequestBody(streamHandler); SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture); builder.response(response); @@ -169,21 +169,21 @@ public HttpExecuteResponse call() throws IOException { } } - private void writeRequestBody(HttpStreamBase stream) throws IOException { + private void writeRequestBody(CrtStreamHandler streamHandler) throws IOException { ContentStreamProvider provider = context.sdkRequest().contentStreamProvider().orElse(null); if (provider == null) { return; } + streamHandler.waitForStream(); try (InputStream inputStream = provider.newStream()) { byte[] buf = new byte[WRITE_BUFFER_SIZE]; int read; - while ((read = inputStream.read(buf, 0, buf.length)) >= 0) { byte[] chunk = read == buf.length ? buf : Arrays.copyOf(buf, read); - CompletableFutureUtils.joinInterruptibly(stream.writeData(chunk, false)); + CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(chunk, false)); } - CompletableFutureUtils.joinInterruptibly(stream.writeData(null, true)); + CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(null, true)); } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java index 7629683899a5..e3dfe024e9cd 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java @@ -67,8 +67,10 @@ private void doExecute(CrtAsyncRequestContext executionContext, HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext); + CrtStreamHandler streamHandler = new CrtStreamHandler(); + HttpStreamBaseResponseHandler crtResponseHandler = - CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler()); + CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler); CompletableFuture streamFuture = executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); @@ -83,6 +85,8 @@ private void doExecute(CrtAsyncRequestContext executionContext, if (throwable != null) { Throwable toThrow = wrapCrtException(throwable); reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler()); + } else { + streamHandler.setStream(stream); } }); } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java index 7c764cf127df..98ace2a65766 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java @@ -22,7 +22,6 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpStreamBase; -import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; @@ -32,23 +31,22 @@ @SdkInternalApi public final class CrtRequestExecutor { - public ExecutionResult execute(CrtRequestContext executionContext) { + public CompletableFuture execute(CrtRequestContext executionContext, + CrtStreamHandler streamHandler) { CompletableFuture responseFuture = new CompletableFuture<>(); - CompletableFuture streamFuture; try { - streamFuture = doExecute(executionContext, responseFuture); + doExecute(executionContext, responseFuture, streamHandler); } catch (Throwable t) { responseFuture.completeExceptionally(t); - streamFuture = new CompletableFuture<>(); - streamFuture.completeExceptionally(t); } - return new ExecutionResult(streamFuture, responseFuture); + return responseFuture; } - private CompletableFuture doExecute(CrtRequestContext executionContext, - CompletableFuture responseFuture) { + private void doExecute(CrtRequestContext executionContext, + CompletableFuture responseFuture, + CrtStreamHandler streamHandler) { MetricCollector metricCollector = executionContext.metricCollector(); boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); @@ -58,7 +56,8 @@ private CompletableFuture doExecute(CrtRequestContext executionC acquireStartTime = System.nanoTime(); } - HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(responseFuture); + InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler = + new InputStreamAdaptingHttpStreamResponseHandler(responseFuture, streamHandler); HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext); @@ -77,32 +76,9 @@ private CompletableFuture doExecute(CrtRequestContext executionC if (throwable != null) { Throwable toThrow = wrapCrtException(throwable); responseFuture.completeExceptionally(toThrow); + } else { + streamHandler.setStream(streamBase); } }); - - return streamFuture; - } - - /** - * Holds the result of submitting a request to CRT: the stream (for writing body data via - * {@code writeData}) and the response future (for reading the response). - */ - public static final class ExecutionResult { - private final CompletableFuture streamFuture; - private final CompletableFuture responseFuture; - - ExecutionResult(CompletableFuture streamFuture, - CompletableFuture responseFuture) { - this.streamFuture = streamFuture; - this.responseFuture = responseFuture; - } - - public CompletableFuture streamFuture() { - return streamFuture; - } - - public CompletableFuture responseFuture() { - return responseFuture; - } } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java new file mode 100644 index 000000000000..97bdcd8c6f56 --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java @@ -0,0 +1,117 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.crt.http.HttpStreamBase; + +/** + * Manages the lifecycle of a CRT HTTP stream, providing thread-safe access to stream operations. + * Shared between the request executor (for writing body data) and the response handler (for + * incrementing the window and releasing/closing the connection). + */ +@SdkInternalApi +public final class CrtStreamHandler { + + private final Object streamLock = new Object(); + private final CountDownLatch streamLatch = new CountDownLatch(1); + private HttpStreamBase stream; + private boolean streamClosed; + + /** + * Sets the stream. Called once when the stream is acquired from the connection pool. + */ + public void setStream(HttpStreamBase stream) { + this.stream = stream; + streamLatch.countDown(); + } + + /** + * Blocks until the stream has been acquired. + */ + public void waitForStream() { + try { + streamLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for stream", e); + } + } + + /** + * Write data to the stream. The caller must ensure the stream is ready (via {@link #waitForStream()}) + * before calling this method. + */ + public CompletableFuture writeData(byte[] data, boolean endStream) { + if (streamLatch.getCount() != 0) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + new IllegalStateException("writeData called before stream is ready. Call waitForStream() first.")); + return future; + } + synchronized (streamLock) { + if (streamClosed) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + new IOException("Stream is already closed, cannot write data.")); + return future; + } + return stream.writeData(data, endStream); + } + } + + public void incrementWindow(int windowSize) { + if (streamLatch.getCount() != 0) { + throw new IllegalStateException("incrementWindow called before stream is ready."); + } + synchronized (streamLock) { + if (!streamClosed) { + stream.incrementWindow(windowSize); + } + } + } + + /** + * Release the connection back to the pool so that it may be reused. This should be called when the request + * completes successfully and the response has been fully consumed. + */ + public void releaseConnection() { + synchronized (streamLock) { + if (!streamClosed && stream != null) { + streamClosed = true; + stream.close(); + } + } + } + + /** + * Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the + * connection pool. This should be called on error paths or when the stream is aborted before the response is + * fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract. + */ + public void closeConnection() { + synchronized (streamLock) { + if (!streamClosed && stream != null) { + streamClosed = true; + stream.cancel(); + stream.close(); + } + } + } +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java index 1beaa872b5f4..df44ab28cbe0 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.async.SimplePublisher; @@ -48,27 +49,39 @@ public final class CrtResponseAdapter implements HttpStreamBaseResponseHandler { private final SimplePublisher responsePublisher; private final SdkHttpResponse.Builder responseBuilder; private final ResponseHandlerHelper responseHandlerHelper; + private final CrtStreamHandler streamHandler; private CrtResponseAdapter(CompletableFuture completionFuture, - SdkAsyncHttpResponseHandler responseHandler) { - this(completionFuture, responseHandler, new SimplePublisher<>()); + SdkAsyncHttpResponseHandler responseHandler, + CrtStreamHandler streamHandler) { + this(completionFuture, responseHandler, new SimplePublisher<>(), streamHandler); } @SdkTestInternalApi public CrtResponseAdapter(CompletableFuture completionFuture, SdkAsyncHttpResponseHandler responseHandler, SimplePublisher simplePublisher) { + this(completionFuture, responseHandler, simplePublisher, new CrtStreamHandler()); + } + + @SdkTestInternalApi + public CrtResponseAdapter(CompletableFuture completionFuture, + SdkAsyncHttpResponseHandler responseHandler, + SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture"); this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler"); this.responseBuilder = SdkHttpResponse.builder(); this.responsePublisher = simplePublisher; + this.streamHandler = streamHandler; this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder); } public static HttpStreamBaseResponseHandler toCrtResponseHandler( CompletableFuture requestFuture, - SdkAsyncHttpResponseHandler responseHandler) { - return new CrtResponseAdapter(requestFuture, responseHandler); + SdkAsyncHttpResponseHandler responseHandler, + CrtStreamHandler streamHandler) { + return new CrtResponseAdapter(requestFuture, responseHandler, streamHandler); } @Override @@ -89,17 +102,16 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { CompletableFuture writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn)); if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) { - // Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT. return bodyBytesIn.length; } writeFuture.whenComplete((result, failure) -> { if (failure != null) { failResponseHandlerAndFuture(failure); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); return; } - responseHandlerHelper.incrementWindow(bodyBytesIn.length); + streamHandler.incrementWindow(bodyBytesIn.length); }); return 0; @@ -122,7 +134,7 @@ private void onSuccessfulResponseComplete() { } completionFuture.complete(null); }); - responseHandlerHelper.releaseConnection(); + streamHandler.releaseConnection(); } private void onFailedResponseComplete(HttpException error) { @@ -130,7 +142,7 @@ private void onFailedResponseComplete(HttpException error) { Throwable toThrow = wrapWithIoExceptionIfRetryable(error); responsePublisher.error(toThrow); failResponseHandlerAndFuture(toThrow); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); } private void failResponseHandlerAndFuture(Throwable error) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java index bb04d71fdb2e..6275f14ae51d 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.AbortableInputStreamSubscriber; import software.amazon.awssdk.http.crt.AwsCrtHttpClient; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.async.SimplePublisher; @@ -45,17 +46,21 @@ public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpS private final CompletableFuture requestCompletionFuture; private final SdkHttpFullResponse.Builder responseBuilder; private final ResponseHandlerHelper responseHandlerHelper; + private final CrtStreamHandler streamHandler; - public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture) { - this(requestCompletionFuture, new SimplePublisher<>()); + public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture, + CrtStreamHandler streamHandler) { + this(requestCompletionFuture, new SimplePublisher<>(), streamHandler); } @SdkTestInternalApi public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture, - SimplePublisher simplePublisher) { + SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { this.requestCompletionFuture = requestCompletionFuture; this.responseBuilder = SdkHttpResponse.builder(); this.simplePublisher = simplePublisher; + this.streamHandler = streamHandler; this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder); } @@ -66,7 +71,7 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int // Propagate cancellation requestCompletionFuture.exceptionally(t -> { - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); return null; }); } @@ -76,7 +81,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { if (inputStreamSubscriber == null) { inputStreamSubscriber = AbortableInputStreamSubscriber.builder() - .doAfterClose(() -> responseHandlerHelper.closeConnection()) + .doAfterClose(() -> streamHandler.closeConnection()) .build(); simplePublisher.subscribe(inputStreamSubscriber); // For response with a payload, we need to complete the future here to allow downstream to retrieve the data from @@ -97,10 +102,10 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future", failure); requestCompletionFuture.completeExceptionally(failure); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); return; } - responseHandlerHelper.incrementWindow(bodyBytesIn.length); + streamHandler.incrementWindow(bodyBytesIn.length); }); // Window will be incremented after the subscriber consumes the data, returning 0 here to disable it. @@ -120,16 +125,15 @@ private void onFailedResponseComplete(int errorCode) { Throwable toThrow = wrapWithIoExceptionIfRetryable(new HttpException(errorCode)); simplePublisher.error(toThrow); requestCompletionFuture.completeExceptionally(toThrow); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); } private void onSuccessfulResponseComplete() { // For response without a payload, for example, S3 PutObjectResponse, we need to complete the future // in onResponseComplete callback since onResponseBody will never be invoked. - requestCompletionFuture.complete(responseBuilder.build()); // requestCompletionFuture has been completed at this point, no need to notify the future simplePublisher.complete(); - responseHandlerHelper.releaseConnection(); + streamHandler.releaseConnection(); } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java index b90b43210472..4c9d4813f5cf 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java @@ -30,20 +30,12 @@ public class ResponseHandlerHelper { private final SdkHttpResponse.Builder responseBuilder; - private HttpStreamBase stream; - private boolean streamClosed; - private final Object streamLock = new Object(); public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder) { this.responseBuilder = responseBuilder; } public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) { - synchronized (streamLock) { - if (this.stream == null) { - this.stream = stream; - } - } if (headerType == HttpHeaderBlock.MAIN.getValue()) { for (HttpHeader h : nextHeaders) { responseBuilder.appendHeader(h.getName(), h.getValue()); @@ -51,40 +43,4 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int responseBuilder.statusCode(responseStatusCode); } } - - public void incrementWindow(int windowSize) { - synchronized (streamLock) { - if (!streamClosed && stream != null) { - stream.incrementWindow(windowSize); - } - } - } - - /** - * Release the connection back to the pool so that it may be reused. This should be called when the request - * completes successfully and the response has been fully consumed. - */ - public void releaseConnection() { - synchronized (streamLock) { - if (!streamClosed && stream != null) { - streamClosed = true; - stream.close(); - } - } - } - - /** - * Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the - * connection pool. This should be called on error paths or when the stream is aborted before the response is - * fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract. - */ - public void closeConnection() { - synchronized (streamLock) { - if (!streamClosed && stream != null) { - streamClosed = true; - stream.cancel(); - stream.close(); - } - } - } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java index 9318f6af228a..959e4286d2c8 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java @@ -25,22 +25,24 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpHeaderBlock; import software.amazon.awssdk.crt.http.HttpStream; import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.utils.async.SimplePublisher; +import software.amazon.awssdk.utils.async.SimplePublisher; @ExtendWith(MockitoExtension.class) public abstract class BaseHttpStreamResponseHandlerTest { @@ -54,14 +56,28 @@ public abstract class BaseHttpStreamResponseHandlerTest { HttpStreamBaseResponseHandler responseHandler; - abstract HttpStreamBaseResponseHandler responseHandler(); + CrtStreamHandler streamHandler; + + abstract HttpStreamBaseResponseHandler responseHandler(CrtStreamHandler streamHandler); - abstract HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher); + abstract HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher, + CrtStreamHandler streamHandler); @BeforeEach public void setUp() { requestFuture = new CompletableFuture<>(); - responseHandler = responseHandler(); + + // Simulate CRT refcount behavior: isNull() returns true after close() + AtomicBoolean closed = new AtomicBoolean(false); + Mockito.lenient().when(httpStream.isNull()).thenAnswer(invocation -> closed.get()); + Mockito.lenient().doAnswer((Answer) invocation -> { + closed.set(true); + return null; + }).when(httpStream).close(); + + streamHandler = new CrtStreamHandler(); + streamHandler.setStream(httpStream); + responseHandler = responseHandler(streamHandler); } @Test @@ -98,9 +114,8 @@ void failedToGetResponse_shouldCancelAndCloseStream() { responseHandler.onResponseComplete(httpStream, 1); assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class); - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); } @Test @@ -123,7 +138,7 @@ void publisherWritesFutureFails_shouldCancelAndCloseStream() { CompletableFuture future = new CompletableFuture<>(); when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); - HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); handler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), @@ -140,9 +155,8 @@ void publisherWritesFutureFails_shouldCancelAndCloseStream() { // we don't verify here because it behaves differently in async and sync } - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); verify(httpStream, never()).incrementWindow(anyInt()); } @@ -152,7 +166,7 @@ void publisherWritesFutureCompletesAfterStreamClosed_shouldNotInvokeIncrementWin when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); when(simplePublisher.complete()).thenReturn(future); - HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); @@ -166,7 +180,7 @@ void publisherWritesFutureCompletesAfterStreamClosed_shouldNotInvokeIncrementWin future.complete(null); requestFuture.join(); - verify(httpStream).close(); + verify(httpStream, Mockito.atLeastOnce()).close(); verify(httpStream, never()).incrementWindow(anyInt()); } @@ -176,7 +190,7 @@ void publisherWritesFutureCompletesBeforeStreamClosed_shouldInvokeIncrementWindo when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); when(simplePublisher.complete()).thenReturn(future); - HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); @@ -191,7 +205,7 @@ void publisherWritesFutureCompletesBeforeStreamClosed_shouldInvokeIncrementWindo handler.onResponseComplete(httpStream, 0); requestFuture.join(); verify(httpStream).incrementWindow(anyInt()); - verify(httpStream).close(); + verify(httpStream, Mockito.atLeastOnce()).close(); } static HttpHeader[] getHttpHeaders() { diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java index 87a2849edc7b..a27e778cd265 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java @@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() { .request(HttpExecuteRequest.builder().build()) .build(); - CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); } @@ -102,7 +102,7 @@ public void execute_acquireStreamFails_wrapsWithIOException() { .thenReturn(completableFuture); completableFuture.completeExceptionally(exception); - CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); } @@ -116,7 +116,7 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable) Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class); } @@ -133,7 +133,7 @@ public void execute_httpException_mapsToCorrectException(Entry executeFuture = requestExecutor.execute(context).responseFuture(); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass); } @@ -146,7 +146,7 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() { Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThatThrownBy(executeFuture::join).hasCause(exception); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java index e1caaeb021a9..aba67504c5a0 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java @@ -33,34 +33,36 @@ import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; import software.amazon.awssdk.utils.async.SimplePublisher; public class CrtResponseHandlerTest extends BaseHttpStreamResponseHandlerTest { @Override - HttpStreamBaseResponseHandler responseHandler() { + HttpStreamBaseResponseHandler responseHandler(CrtStreamHandler streamHandler) { AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response, executionAttributes) -> null, Function.identity(), new ExecutionAttributes()); responseHandler.prepare(); - return CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler); + return CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler, streamHandler); } @Override - HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { + HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response, executionAttributes) -> null, Function.identity(), new ExecutionAttributes()); responseHandler.prepare(); - return new CrtResponseAdapter(requestFuture, responseHandler, simplePublisher); + return new CrtResponseAdapter(requestFuture, responseHandler, simplePublisher, streamHandler); } @Test void onResponseComplete_publisherCancelled_closesStream() { SdkAsyncHttpResponseHandler responseHandler = new TestAsyncHttpResponseHandler(); - HttpStreamBaseResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler); + HttpStreamBaseResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); crtResponseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), httpHeaders); diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java new file mode 100644 index 000000000000..6ac5d39f7d2e --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java @@ -0,0 +1,90 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import software.amazon.awssdk.crt.http.HttpStreamBase; + +@ExtendWith(MockitoExtension.class) +class CrtStreamHandlerTest { + + @Mock + private HttpStreamBase stream; + + private CrtStreamHandler streamHandler; + + @BeforeEach + void setUp() { + AtomicBoolean closed = new AtomicBoolean(false); + Mockito.lenient().when(stream.isNull()).thenAnswer(invocation -> closed.get()); + Mockito.lenient().doAnswer((Answer) invocation -> { + closed.set(true); + return null; + }).when(stream).close(); + + streamHandler = new CrtStreamHandler(); + streamHandler.setStream(stream); + } + + @Test + void releaseConnection_shouldCallClose() { + streamHandler.releaseConnection(); + + verify(stream, never()).cancel(); + verify(stream).close(); + } + + @Test + void closeConnection_shouldCallCancelAndClose() { + streamHandler.closeConnection(); + + verify(stream).cancel(); + verify(stream, Mockito.atLeastOnce()).close(); + } + + @Test + void incrementWindow_afterReleaseConnection_shouldBeNoOp() { + streamHandler.releaseConnection(); + streamHandler.incrementWindow(1024); + + verify(stream, never()).incrementWindow(1024); + } + + @Test + void incrementWindow_afterCloseConnection_shouldBeNoOp() { + streamHandler.closeConnection(); + streamHandler.incrementWindow(1024); + + verify(stream, never()).incrementWindow(1024); + } + + @Test + void incrementWindow_beforeClose_shouldWork() { + streamHandler.incrementWindow(1024); + + verify(stream).incrementWindow(1024); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java index 8c786624426b..2321ee0c8809 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java @@ -31,19 +31,21 @@ import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.http.SdkHttpFullResponse; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; import software.amazon.awssdk.utils.async.SimplePublisher; public class InputStreamAdaptingHttpStreamResponseHandlerTest extends BaseHttpStreamResponseHandlerTest { @Override - HttpStreamBaseResponseHandler responseHandler() { - return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); + HttpStreamBaseResponseHandler responseHandler(CrtStreamHandler streamHandler) { + return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, streamHandler); } @Override - HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { - return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, simplePublisher); + HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { + return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, simplePublisher, streamHandler); } @Test @@ -63,9 +65,8 @@ void abortStream_shouldCancelAndCloseStream() throws IOException { abortableInputStream.read(); abortableInputStream.abort(); - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); } @Test @@ -85,7 +86,7 @@ void closeStream_shouldCloseStreamWithoutCancel() throws IOException { abortableInputStream.read(); abortableInputStream.close(); - verify(httpStream).close(); + verify(httpStream, Mockito.atLeastOnce()).close(); } @Test @@ -97,8 +98,7 @@ void cancelFuture_shouldCancelAndCloseStream() { requestFuture.completeExceptionally(new RuntimeException()); - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java deleted file mode 100644 index 1714b9930ec4..000000000000 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.http.crt.internal; - -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import software.amazon.awssdk.crt.http.HttpHeader; -import software.amazon.awssdk.crt.http.HttpHeaderBlock; -import software.amazon.awssdk.crt.http.HttpStreamBase; -import software.amazon.awssdk.http.SdkHttpResponse; -import software.amazon.awssdk.http.crt.internal.response.ResponseHandlerHelper; - -@ExtendWith(MockitoExtension.class) -class ResponseHandlerHelperTest { - - @Mock - private HttpStreamBase stream; - - private ResponseHandlerHelper helper; - - @BeforeEach - void setUp() { - helper = new ResponseHandlerHelper(SdkHttpResponse.builder()); - // Register the stream via onResponseHeaders - HttpHeader[] headers = { new HttpHeader("Content-Length", "1") }; - helper.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.getValue(), headers); - } - - @Test - void releaseConnection_shouldOnlyCallClose() { - helper.releaseConnection(); - - verify(stream, never()).cancel(); - verify(stream).close(); - } - - @Test - void closeConnection_shouldCallCancelThenClose() { - helper.closeConnection(); - - InOrder inOrder = Mockito.inOrder(stream); - inOrder.verify(stream).cancel(); - inOrder.verify(stream).close(); - } - - @Test - void releaseConnection_calledTwice_shouldOnlyCloseOnce() { - helper.releaseConnection(); - helper.releaseConnection(); - - verify(stream, Mockito.times(1)).close(); - } - - @Test - void closeConnection_calledTwice_shouldOnlyCloseOnce() { - helper.closeConnection(); - helper.closeConnection(); - - verify(stream, Mockito.times(1)).cancel(); - verify(stream, Mockito.times(1)).close(); - } - - @Test - void releaseConnection_afterCloseConnection_shouldBeNoOp() { - helper.closeConnection(); - helper.releaseConnection(); - - verify(stream, Mockito.times(1)).cancel(); - verify(stream, Mockito.times(1)).close(); - } - - @Test - void closeConnection_afterReleaseConnection_shouldBeNoOp() { - helper.releaseConnection(); - helper.closeConnection(); - - verify(stream, never()).cancel(); - verify(stream, Mockito.times(1)).close(); - } - - @Test - void incrementWindow_afterReleaseConnection_shouldBeNoOp() { - helper.releaseConnection(); - helper.incrementWindow(1024); - - verify(stream, never()).incrementWindow(1024); - } - - @Test - void incrementWindow_afterCloseConnection_shouldBeNoOp() { - helper.closeConnection(); - helper.incrementWindow(1024); - - verify(stream, never()).incrementWindow(1024); - } - - @Test - void incrementWindow_beforeClose_shouldWork() { - helper.incrementWindow(1024); - - verify(stream).incrementWindow(1024); - } -}