diff --git a/.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json b/.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json new file mode 100644 index 000000000000..f0cc6298ec3c --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT HTTP Client", + "contributor": "", + "description": "Fixed a potential deadlock in `AwsCrtHttpClient` that could occur when the request body `InputStream` blocked waiting for data on the CRT event loop thread. This could happen when a blocking stream (e.g., a `BufferedInputStream` wrapping a `ResponseInputStream`) was used as a request body and the read depended on the same event loop thread to deliver data. Request body writing now happens on the caller thread." +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java index 9c6d769e48fa..d81c286fc5f4 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java @@ -18,13 +18,16 @@ import static software.amazon.awssdk.http.HttpMetric.HTTP_CLIENT_NAME; import java.io.IOException; +import java.io.InputStream; import java.time.Duration; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpStreamManager; +import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.ExecutableHttpRequest; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.HttpExecuteResponse; @@ -35,6 +38,8 @@ import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase; import software.amazon.awssdk.http.crt.internal.CrtRequestContext; import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; +import software.amazon.awssdk.http.crt.internal.CrtUtils; import software.amazon.awssdk.utils.AttributeMap; import software.amazon.awssdk.utils.CompletableFutureUtils; @@ -107,6 +112,8 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) { } private static final class CrtHttpRequest implements ExecutableHttpRequest { + private static final int WRITE_BUFFER_SIZE = 16 * 1024; + private final CrtRequestContext context; private volatile CompletableFuture responseFuture; @@ -119,7 +126,14 @@ public HttpExecuteResponse call() throws IOException { HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder(); try { - responseFuture = new CrtRequestExecutor().execute(context); + CrtStreamHandler streamHandler = new CrtStreamHandler(); + responseFuture = new CrtRequestExecutor().execute(context, streamHandler); + + // Write the request body from the caller thread via the stream handler, + // which guards against concurrent stream close with a synchronized block. + // This avoids blocking the CRT event loop thread in InputStream.read(). + writeRequestBody(streamHandler); + SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture); builder.response(response); builder.responseBody(response.content().orElse(null)); @@ -140,6 +154,10 @@ public HttpExecuteResponse call() throws IOException { } if (cause instanceof HttpException) { + Throwable wrapped = CrtUtils.wrapCrtException(cause); + if (wrapped instanceof IOException) { + throw (IOException) wrapped; + } throw (HttpException) cause; } @@ -151,6 +169,24 @@ public HttpExecuteResponse call() throws IOException { } } + private void writeRequestBody(CrtStreamHandler streamHandler) throws IOException { + ContentStreamProvider provider = context.sdkRequest().contentStreamProvider().orElse(null); + if (provider == null) { + return; + } + + streamHandler.waitForStream(); + try (InputStream inputStream = provider.newStream()) { + byte[] buf = new byte[WRITE_BUFFER_SIZE]; + int read; + while ((read = inputStream.read(buf, 0, buf.length)) >= 0) { + byte[] chunk = read == buf.length ? buf : Arrays.copyOf(buf, read); + CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(chunk, false)); + } + CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(null, true)); + } + } + @Override public void abort() { if (responseFuture != null) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java index 7629683899a5..e3dfe024e9cd 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java @@ -67,8 +67,10 @@ private void doExecute(CrtAsyncRequestContext executionContext, HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext); + CrtStreamHandler streamHandler = new CrtStreamHandler(); + HttpStreamBaseResponseHandler crtResponseHandler = - CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler()); + CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler); CompletableFuture streamFuture = executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); @@ -83,6 +85,8 @@ private void doExecute(CrtAsyncRequestContext executionContext, if (throwable != null) { Throwable toThrow = wrapCrtException(throwable); reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler()); + } else { + streamHandler.setStream(stream); } }); } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java index 7165696435e1..98ace2a65766 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java @@ -22,7 +22,6 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpStreamBase; -import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; @@ -32,37 +31,40 @@ @SdkInternalApi public final class CrtRequestExecutor { - public CompletableFuture execute(CrtRequestContext executionContext) { - CompletableFuture requestFuture = new CompletableFuture<>(); + public CompletableFuture execute(CrtRequestContext executionContext, + CrtStreamHandler streamHandler) { + CompletableFuture responseFuture = new CompletableFuture<>(); try { - doExecute(executionContext, requestFuture); + doExecute(executionContext, responseFuture, streamHandler); } catch (Throwable t) { - requestFuture.completeExceptionally(t); + responseFuture.completeExceptionally(t); } - return requestFuture; + return responseFuture; } - private void doExecute(CrtRequestContext executionContext, CompletableFuture requestFuture) { + private void doExecute(CrtRequestContext executionContext, + CompletableFuture responseFuture, + CrtStreamHandler streamHandler) { MetricCollector metricCollector = executionContext.metricCollector(); boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); long acquireStartTime = 0; if (shouldPublishMetrics) { - // go ahead and get acquireStartTime for the concurrency timer as early as possible, - // so it's as accurate as possible, but only do it in a branch since clock_gettime() - // results in a full sys call barrier (multiple mutexes and a hw interrupt). acquireStartTime = System.nanoTime(); } - HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); + InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler = + new InputStreamAdaptingHttpStreamResponseHandler(responseFuture, streamHandler); HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext); + boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent(); + CompletableFuture streamFuture = - executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); + executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody); long finalAcquireStartTime = acquireStartTime; @@ -73,7 +75,9 @@ private void doExecute(CrtRequestContext executionContext, CompletableFuture writeData(byte[] data, boolean endStream) { + if (streamLatch.getCount() != 0) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + new IllegalStateException("writeData called before stream is ready. Call waitForStream() first.")); + return future; + } + synchronized (streamLock) { + if (streamClosed) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + new IOException("Stream is already closed, cannot write data.")); + return future; + } + return stream.writeData(data, endStream); + } + } + + public void incrementWindow(int windowSize) { + if (streamLatch.getCount() != 0) { + throw new IllegalStateException("incrementWindow called before stream is ready."); + } + synchronized (streamLock) { + if (!streamClosed) { + stream.incrementWindow(windowSize); + } + } + } + + /** + * Release the connection back to the pool so that it may be reused. This should be called when the request + * completes successfully and the response has been fully consumed. + */ + public void releaseConnection() { + synchronized (streamLock) { + if (!streamClosed && stream != null) { + streamClosed = true; + stream.close(); + } + } + } + + /** + * Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the + * connection pool. This should be called on error paths or when the stream is aborted before the response is + * fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract. + */ + public void closeConnection() { + synchronized (streamLock) { + if (!streamClosed && stream != null) { + streamClosed = true; + stream.cancel(); + stream.close(); + } + } + } +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java index 8672d80b0d1b..e97b1adb514e 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java @@ -78,14 +78,7 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) { HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest)); String finalEncodedPath = encodedPath + encodedQueryString; - return sdkExecuteRequest.contentStreamProvider() - .map(provider -> new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, - new CrtRequestInputStreamAdapter(provider))) - .orElse(new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, null)); + return new HttpRequest(method, finalEncodedPath, crtHeaderArray, null); } private static HttpHeader[] asArray(List crtHeaderList) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java deleted file mode 100644 index 68f418b9e1df..000000000000 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.http.crt.internal.request; - -import static java.lang.Math.min; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpRequestBodyStream; -import software.amazon.awssdk.http.ContentStreamProvider; - -@SdkInternalApi -final class CrtRequestInputStreamAdapter implements HttpRequestBodyStream { - private static final int READ_BUFFER_SIZE = 16 * 1024; - - private final ContentStreamProvider provider; - private volatile InputStream providerStream; - private final byte[] readBuffer = new byte[READ_BUFFER_SIZE]; - - CrtRequestInputStreamAdapter(ContentStreamProvider provider) { - this.provider = provider; - } - - @Override - public boolean sendRequestBody(ByteBuffer bodyBytesOut) { - int read; - - try { - if (providerStream == null) { - createNewStream(); - } - - int toRead = min(READ_BUFFER_SIZE, bodyBytesOut.remaining()); - read = providerStream.read(readBuffer, 0, toRead); - - if (read > 0) { - bodyBytesOut.put(readBuffer, 0, read); - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return read < 0; - } - - @Override - public boolean resetPosition() { - try { - createNewStream(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return true; - } - - private void createNewStream() throws IOException { - if (providerStream != null) { - providerStream.close(); - } - providerStream = provider.newStream(); - } -} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java index 1beaa872b5f4..df44ab28cbe0 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.async.SimplePublisher; @@ -48,27 +49,39 @@ public final class CrtResponseAdapter implements HttpStreamBaseResponseHandler { private final SimplePublisher responsePublisher; private final SdkHttpResponse.Builder responseBuilder; private final ResponseHandlerHelper responseHandlerHelper; + private final CrtStreamHandler streamHandler; private CrtResponseAdapter(CompletableFuture completionFuture, - SdkAsyncHttpResponseHandler responseHandler) { - this(completionFuture, responseHandler, new SimplePublisher<>()); + SdkAsyncHttpResponseHandler responseHandler, + CrtStreamHandler streamHandler) { + this(completionFuture, responseHandler, new SimplePublisher<>(), streamHandler); } @SdkTestInternalApi public CrtResponseAdapter(CompletableFuture completionFuture, SdkAsyncHttpResponseHandler responseHandler, SimplePublisher simplePublisher) { + this(completionFuture, responseHandler, simplePublisher, new CrtStreamHandler()); + } + + @SdkTestInternalApi + public CrtResponseAdapter(CompletableFuture completionFuture, + SdkAsyncHttpResponseHandler responseHandler, + SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture"); this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler"); this.responseBuilder = SdkHttpResponse.builder(); this.responsePublisher = simplePublisher; + this.streamHandler = streamHandler; this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder); } public static HttpStreamBaseResponseHandler toCrtResponseHandler( CompletableFuture requestFuture, - SdkAsyncHttpResponseHandler responseHandler) { - return new CrtResponseAdapter(requestFuture, responseHandler); + SdkAsyncHttpResponseHandler responseHandler, + CrtStreamHandler streamHandler) { + return new CrtResponseAdapter(requestFuture, responseHandler, streamHandler); } @Override @@ -89,17 +102,16 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { CompletableFuture writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn)); if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) { - // Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT. return bodyBytesIn.length; } writeFuture.whenComplete((result, failure) -> { if (failure != null) { failResponseHandlerAndFuture(failure); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); return; } - responseHandlerHelper.incrementWindow(bodyBytesIn.length); + streamHandler.incrementWindow(bodyBytesIn.length); }); return 0; @@ -122,7 +134,7 @@ private void onSuccessfulResponseComplete() { } completionFuture.complete(null); }); - responseHandlerHelper.releaseConnection(); + streamHandler.releaseConnection(); } private void onFailedResponseComplete(HttpException error) { @@ -130,7 +142,7 @@ private void onFailedResponseComplete(HttpException error) { Throwable toThrow = wrapWithIoExceptionIfRetryable(error); responsePublisher.error(toThrow); failResponseHandlerAndFuture(toThrow); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); } private void failResponseHandlerAndFuture(Throwable error) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java index bb04d71fdb2e..6275f14ae51d 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.AbortableInputStreamSubscriber; import software.amazon.awssdk.http.crt.AwsCrtHttpClient; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.async.SimplePublisher; @@ -45,17 +46,21 @@ public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpS private final CompletableFuture requestCompletionFuture; private final SdkHttpFullResponse.Builder responseBuilder; private final ResponseHandlerHelper responseHandlerHelper; + private final CrtStreamHandler streamHandler; - public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture) { - this(requestCompletionFuture, new SimplePublisher<>()); + public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture, + CrtStreamHandler streamHandler) { + this(requestCompletionFuture, new SimplePublisher<>(), streamHandler); } @SdkTestInternalApi public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture, - SimplePublisher simplePublisher) { + SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { this.requestCompletionFuture = requestCompletionFuture; this.responseBuilder = SdkHttpResponse.builder(); this.simplePublisher = simplePublisher; + this.streamHandler = streamHandler; this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder); } @@ -66,7 +71,7 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int // Propagate cancellation requestCompletionFuture.exceptionally(t -> { - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); return null; }); } @@ -76,7 +81,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { if (inputStreamSubscriber == null) { inputStreamSubscriber = AbortableInputStreamSubscriber.builder() - .doAfterClose(() -> responseHandlerHelper.closeConnection()) + .doAfterClose(() -> streamHandler.closeConnection()) .build(); simplePublisher.subscribe(inputStreamSubscriber); // For response with a payload, we need to complete the future here to allow downstream to retrieve the data from @@ -97,10 +102,10 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future", failure); requestCompletionFuture.completeExceptionally(failure); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); return; } - responseHandlerHelper.incrementWindow(bodyBytesIn.length); + streamHandler.incrementWindow(bodyBytesIn.length); }); // Window will be incremented after the subscriber consumes the data, returning 0 here to disable it. @@ -120,16 +125,15 @@ private void onFailedResponseComplete(int errorCode) { Throwable toThrow = wrapWithIoExceptionIfRetryable(new HttpException(errorCode)); simplePublisher.error(toThrow); requestCompletionFuture.completeExceptionally(toThrow); - responseHandlerHelper.closeConnection(); + streamHandler.closeConnection(); } private void onSuccessfulResponseComplete() { // For response without a payload, for example, S3 PutObjectResponse, we need to complete the future // in onResponseComplete callback since onResponseBody will never be invoked. - requestCompletionFuture.complete(responseBuilder.build()); // requestCompletionFuture has been completed at this point, no need to notify the future simplePublisher.complete(); - responseHandlerHelper.releaseConnection(); + streamHandler.releaseConnection(); } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java index b90b43210472..4c9d4813f5cf 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java @@ -30,20 +30,12 @@ public class ResponseHandlerHelper { private final SdkHttpResponse.Builder responseBuilder; - private HttpStreamBase stream; - private boolean streamClosed; - private final Object streamLock = new Object(); public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder) { this.responseBuilder = responseBuilder; } public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) { - synchronized (streamLock) { - if (this.stream == null) { - this.stream = stream; - } - } if (headerType == HttpHeaderBlock.MAIN.getValue()) { for (HttpHeader h : nextHeaders) { responseBuilder.appendHeader(h.getName(), h.getValue()); @@ -51,40 +43,4 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int responseBuilder.statusCode(responseStatusCode); } } - - public void incrementWindow(int windowSize) { - synchronized (streamLock) { - if (!streamClosed && stream != null) { - stream.incrementWindow(windowSize); - } - } - } - - /** - * Release the connection back to the pool so that it may be reused. This should be called when the request - * completes successfully and the response has been fully consumed. - */ - public void releaseConnection() { - synchronized (streamLock) { - if (!streamClosed && stream != null) { - streamClosed = true; - stream.close(); - } - } - } - - /** - * Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the - * connection pool. This should be called on error paths or when the stream is aborted before the response is - * fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract. - */ - public void closeConnection() { - synchronized (streamLock) { - if (!streamClosed && stream != null) { - streamClosed = true; - stream.cancel(); - stream.close(); - } - } - } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java index 9318f6af228a..959e4286d2c8 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java @@ -25,22 +25,24 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpHeaderBlock; import software.amazon.awssdk.crt.http.HttpStream; import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.utils.async.SimplePublisher; +import software.amazon.awssdk.utils.async.SimplePublisher; @ExtendWith(MockitoExtension.class) public abstract class BaseHttpStreamResponseHandlerTest { @@ -54,14 +56,28 @@ public abstract class BaseHttpStreamResponseHandlerTest { HttpStreamBaseResponseHandler responseHandler; - abstract HttpStreamBaseResponseHandler responseHandler(); + CrtStreamHandler streamHandler; + + abstract HttpStreamBaseResponseHandler responseHandler(CrtStreamHandler streamHandler); - abstract HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher); + abstract HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher, + CrtStreamHandler streamHandler); @BeforeEach public void setUp() { requestFuture = new CompletableFuture<>(); - responseHandler = responseHandler(); + + // Simulate CRT refcount behavior: isNull() returns true after close() + AtomicBoolean closed = new AtomicBoolean(false); + Mockito.lenient().when(httpStream.isNull()).thenAnswer(invocation -> closed.get()); + Mockito.lenient().doAnswer((Answer) invocation -> { + closed.set(true); + return null; + }).when(httpStream).close(); + + streamHandler = new CrtStreamHandler(); + streamHandler.setStream(httpStream); + responseHandler = responseHandler(streamHandler); } @Test @@ -98,9 +114,8 @@ void failedToGetResponse_shouldCancelAndCloseStream() { responseHandler.onResponseComplete(httpStream, 1); assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class); - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); } @Test @@ -123,7 +138,7 @@ void publisherWritesFutureFails_shouldCancelAndCloseStream() { CompletableFuture future = new CompletableFuture<>(); when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); - HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); handler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), @@ -140,9 +155,8 @@ void publisherWritesFutureFails_shouldCancelAndCloseStream() { // we don't verify here because it behaves differently in async and sync } - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); verify(httpStream, never()).incrementWindow(anyInt()); } @@ -152,7 +166,7 @@ void publisherWritesFutureCompletesAfterStreamClosed_shouldNotInvokeIncrementWin when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); when(simplePublisher.complete()).thenReturn(future); - HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); @@ -166,7 +180,7 @@ void publisherWritesFutureCompletesAfterStreamClosed_shouldNotInvokeIncrementWin future.complete(null); requestFuture.join(); - verify(httpStream).close(); + verify(httpStream, Mockito.atLeastOnce()).close(); verify(httpStream, never()).incrementWindow(anyInt()); } @@ -176,7 +190,7 @@ void publisherWritesFutureCompletesBeforeStreamClosed_shouldInvokeIncrementWindo when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); when(simplePublisher.complete()).thenReturn(future); - HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); @@ -191,7 +205,7 @@ void publisherWritesFutureCompletesBeforeStreamClosed_shouldInvokeIncrementWindo handler.onResponseComplete(httpStream, 0); requestFuture.join(); verify(httpStream).incrementWindow(anyInt()); - verify(httpStream).close(); + verify(httpStream, Mockito.atLeastOnce()).close(); } static HttpHeader[] getHttpHeaders() { diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java index 456000ac1150..a27e778cd265 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java @@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() { .request(HttpExecuteRequest.builder().build()) .build(); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); } @@ -98,11 +98,11 @@ public void execute_acquireStreamFails_wrapsWithIOException() { CrtRequestContext context = crtRequestContext(); CompletableFuture completableFuture = new CompletableFuture<>(); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); completableFuture.completeExceptionally(exception); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); } @@ -113,10 +113,10 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable) CrtRequestContext context = crtRequestContext(); CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(throwable); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class); } @@ -130,10 +130,10 @@ public void execute_httpException_mapsToCorrectException(Entry completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass); } @@ -143,10 +143,10 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() { CrtRequestContext context = crtRequestContext(); CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean())) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context, new CrtStreamHandler()); assertThatThrownBy(executeFuture::join).hasCause(exception); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java index e1caaeb021a9..aba67504c5a0 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java @@ -33,34 +33,36 @@ import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; import software.amazon.awssdk.utils.async.SimplePublisher; public class CrtResponseHandlerTest extends BaseHttpStreamResponseHandlerTest { @Override - HttpStreamBaseResponseHandler responseHandler() { + HttpStreamBaseResponseHandler responseHandler(CrtStreamHandler streamHandler) { AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response, executionAttributes) -> null, Function.identity(), new ExecutionAttributes()); responseHandler.prepare(); - return CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler); + return CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler, streamHandler); } @Override - HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { + HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response, executionAttributes) -> null, Function.identity(), new ExecutionAttributes()); responseHandler.prepare(); - return new CrtResponseAdapter(requestFuture, responseHandler, simplePublisher); + return new CrtResponseAdapter(requestFuture, responseHandler, simplePublisher, streamHandler); } @Test void onResponseComplete_publisherCancelled_closesStream() { SdkAsyncHttpResponseHandler responseHandler = new TestAsyncHttpResponseHandler(); - HttpStreamBaseResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler); + HttpStreamBaseResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler, streamHandler); HttpHeader[] httpHeaders = getHttpHeaders(); crtResponseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), httpHeaders); diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java new file mode 100644 index 000000000000..6ac5d39f7d2e --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandlerTest.java @@ -0,0 +1,90 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import software.amazon.awssdk.crt.http.HttpStreamBase; + +@ExtendWith(MockitoExtension.class) +class CrtStreamHandlerTest { + + @Mock + private HttpStreamBase stream; + + private CrtStreamHandler streamHandler; + + @BeforeEach + void setUp() { + AtomicBoolean closed = new AtomicBoolean(false); + Mockito.lenient().when(stream.isNull()).thenAnswer(invocation -> closed.get()); + Mockito.lenient().doAnswer((Answer) invocation -> { + closed.set(true); + return null; + }).when(stream).close(); + + streamHandler = new CrtStreamHandler(); + streamHandler.setStream(stream); + } + + @Test + void releaseConnection_shouldCallClose() { + streamHandler.releaseConnection(); + + verify(stream, never()).cancel(); + verify(stream).close(); + } + + @Test + void closeConnection_shouldCallCancelAndClose() { + streamHandler.closeConnection(); + + verify(stream).cancel(); + verify(stream, Mockito.atLeastOnce()).close(); + } + + @Test + void incrementWindow_afterReleaseConnection_shouldBeNoOp() { + streamHandler.releaseConnection(); + streamHandler.incrementWindow(1024); + + verify(stream, never()).incrementWindow(1024); + } + + @Test + void incrementWindow_afterCloseConnection_shouldBeNoOp() { + streamHandler.closeConnection(); + streamHandler.incrementWindow(1024); + + verify(stream, never()).incrementWindow(1024); + } + + @Test + void incrementWindow_beforeClose_shouldWork() { + streamHandler.incrementWindow(1024); + + verify(stream).incrementWindow(1024); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java index 8c786624426b..2321ee0c8809 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java @@ -31,19 +31,21 @@ import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.http.SdkHttpFullResponse; +import software.amazon.awssdk.http.crt.internal.CrtStreamHandler; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; import software.amazon.awssdk.utils.async.SimplePublisher; public class InputStreamAdaptingHttpStreamResponseHandlerTest extends BaseHttpStreamResponseHandlerTest { @Override - HttpStreamBaseResponseHandler responseHandler() { - return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); + HttpStreamBaseResponseHandler responseHandler(CrtStreamHandler streamHandler) { + return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, streamHandler); } @Override - HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { - return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, simplePublisher); + HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher, + CrtStreamHandler streamHandler) { + return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, simplePublisher, streamHandler); } @Test @@ -63,9 +65,8 @@ void abortStream_shouldCancelAndCloseStream() throws IOException { abortableInputStream.read(); abortableInputStream.abort(); - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); } @Test @@ -85,7 +86,7 @@ void closeStream_shouldCloseStreamWithoutCancel() throws IOException { abortableInputStream.read(); abortableInputStream.close(); - verify(httpStream).close(); + verify(httpStream, Mockito.atLeastOnce()).close(); } @Test @@ -97,8 +98,7 @@ void cancelFuture_shouldCancelAndCloseStream() { requestFuture.completeExceptionally(new RuntimeException()); - InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); - inOrder.verify(httpStream).close(); + verify(httpStream).cancel(); + verify(httpStream, Mockito.atLeastOnce()).close(); } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java deleted file mode 100644 index 1714b9930ec4..000000000000 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.http.crt.internal; - -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import software.amazon.awssdk.crt.http.HttpHeader; -import software.amazon.awssdk.crt.http.HttpHeaderBlock; -import software.amazon.awssdk.crt.http.HttpStreamBase; -import software.amazon.awssdk.http.SdkHttpResponse; -import software.amazon.awssdk.http.crt.internal.response.ResponseHandlerHelper; - -@ExtendWith(MockitoExtension.class) -class ResponseHandlerHelperTest { - - @Mock - private HttpStreamBase stream; - - private ResponseHandlerHelper helper; - - @BeforeEach - void setUp() { - helper = new ResponseHandlerHelper(SdkHttpResponse.builder()); - // Register the stream via onResponseHeaders - HttpHeader[] headers = { new HttpHeader("Content-Length", "1") }; - helper.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.getValue(), headers); - } - - @Test - void releaseConnection_shouldOnlyCallClose() { - helper.releaseConnection(); - - verify(stream, never()).cancel(); - verify(stream).close(); - } - - @Test - void closeConnection_shouldCallCancelThenClose() { - helper.closeConnection(); - - InOrder inOrder = Mockito.inOrder(stream); - inOrder.verify(stream).cancel(); - inOrder.verify(stream).close(); - } - - @Test - void releaseConnection_calledTwice_shouldOnlyCloseOnce() { - helper.releaseConnection(); - helper.releaseConnection(); - - verify(stream, Mockito.times(1)).close(); - } - - @Test - void closeConnection_calledTwice_shouldOnlyCloseOnce() { - helper.closeConnection(); - helper.closeConnection(); - - verify(stream, Mockito.times(1)).cancel(); - verify(stream, Mockito.times(1)).close(); - } - - @Test - void releaseConnection_afterCloseConnection_shouldBeNoOp() { - helper.closeConnection(); - helper.releaseConnection(); - - verify(stream, Mockito.times(1)).cancel(); - verify(stream, Mockito.times(1)).close(); - } - - @Test - void closeConnection_afterReleaseConnection_shouldBeNoOp() { - helper.releaseConnection(); - helper.closeConnection(); - - verify(stream, never()).cancel(); - verify(stream, Mockito.times(1)).close(); - } - - @Test - void incrementWindow_afterReleaseConnection_shouldBeNoOp() { - helper.releaseConnection(); - helper.incrementWindow(1024); - - verify(stream, never()).incrementWindow(1024); - } - - @Test - void incrementWindow_afterCloseConnection_shouldBeNoOp() { - helper.closeConnection(); - helper.incrementWindow(1024); - - verify(stream, never()).incrementWindow(1024); - } - - @Test - void incrementWindow_beforeClose_shouldWork() { - helper.incrementWindow(1024); - - verify(stream).incrementWindow(1024); - } -} diff --git a/pom.xml b/pom.xml index 1b6bee34df5e..1848cf1dd892 100644 --- a/pom.xml +++ b/pom.xml @@ -131,7 +131,7 @@ 3.1.5 1.17.1 1.37 - 0.45.1 + 1.0.0-SNAPSHOT 5.10.3