Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT HTTP Client",
"contributor": "",
"description": "Fixed a potential deadlock in `AwsCrtHttpClient` that could occur when the request body `InputStream` blocked waiting for data on the CRT event loop thread. This could happen when a blocking stream (e.g., a `BufferedInputStream` wrapping a `ResponseInputStream`) was used as a request body and the read depended on the same event loop thread to deliver data. Request body writing now happens on the caller thread."
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import static software.amazon.awssdk.http.HttpMetric.HTTP_CLIENT_NAME;

import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
Expand All @@ -35,6 +38,8 @@
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
import software.amazon.awssdk.http.crt.internal.CrtStreamHandler;
import software.amazon.awssdk.http.crt.internal.CrtUtils;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CompletableFutureUtils;

Expand Down Expand Up @@ -107,6 +112,8 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
}

private static final class CrtHttpRequest implements ExecutableHttpRequest {
private static final int WRITE_BUFFER_SIZE = 16 * 1024;

private final CrtRequestContext context;
private volatile CompletableFuture<SdkHttpFullResponse> responseFuture;

Expand All @@ -119,7 +126,14 @@ public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();

try {
responseFuture = new CrtRequestExecutor().execute(context);
CrtStreamHandler streamHandler = new CrtStreamHandler();
responseFuture = new CrtRequestExecutor().execute(context, streamHandler);

// Write the request body from the caller thread via the stream handler,
// which guards against concurrent stream close with a synchronized block.
// This avoids blocking the CRT event loop thread in InputStream.read().
writeRequestBody(streamHandler);

SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
builder.response(response);
builder.responseBody(response.content().orElse(null));
Expand All @@ -140,6 +154,10 @@ public HttpExecuteResponse call() throws IOException {
}

if (cause instanceof HttpException) {
Throwable wrapped = CrtUtils.wrapCrtException(cause);
if (wrapped instanceof IOException) {
throw (IOException) wrapped;
}
throw (HttpException) cause;
}

Expand All @@ -151,6 +169,24 @@ public HttpExecuteResponse call() throws IOException {
}
}

private void writeRequestBody(CrtStreamHandler streamHandler) throws IOException {
ContentStreamProvider provider = context.sdkRequest().contentStreamProvider().orElse(null);
if (provider == null) {
return;
}

streamHandler.waitForStream();
try (InputStream inputStream = provider.newStream()) {
byte[] buf = new byte[WRITE_BUFFER_SIZE];
int read;
while ((read = inputStream.read(buf, 0, buf.length)) >= 0) {
byte[] chunk = read == buf.length ? buf : Arrays.copyOf(buf, read);
CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(chunk, false));
}
Comment on lines +182 to +185
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the input stream has no data available, do we just keep looping until we find data? Seems like we will end up with hot looping and wasting CPU loops for this case.

And, a bit optimization, when there is no data, it will be better to skip invoking the writeData, so that less work to be done

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per InputStream contract, a valid implementation shouldn't return 0 from read() method; it just blocks until data arrives or returns -1 at EOF.

If len is zero, then no bytes are read and 0 is returned; otherwise, there is an attempt to read at least one byte. If no byte is available because the stream is at end of file, the value -1 is returned; otherwise, at least one byte is read and stored into b.

CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(null, true));
}
}

@Override
public void abort() {
if (responseFuture != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ private void doExecute(CrtAsyncRequestContext executionContext,

HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext);

CrtStreamHandler streamHandler = new CrtStreamHandler();

HttpStreamBaseResponseHandler crtResponseHandler =
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler());
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler);

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
Expand All @@ -83,6 +85,8 @@ private void doExecute(CrtAsyncRequestContext executionContext,
if (throwable != null) {
Throwable toThrow = wrapCrtException(throwable);
reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler());
} else {
streamHandler.setStream(stream);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
Expand All @@ -32,37 +31,40 @@
@SdkInternalApi
public final class CrtRequestExecutor {

public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext) {
CompletableFuture<SdkHttpFullResponse> requestFuture = new CompletableFuture<>();
public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext,
CrtStreamHandler streamHandler) {
CompletableFuture<SdkHttpFullResponse> responseFuture = new CompletableFuture<>();

try {
doExecute(executionContext, requestFuture);
doExecute(executionContext, responseFuture, streamHandler);
} catch (Throwable t) {
requestFuture.completeExceptionally(t);
responseFuture.completeExceptionally(t);
}

return requestFuture;
return responseFuture;
}

private void doExecute(CrtRequestContext executionContext, CompletableFuture<SdkHttpFullResponse> requestFuture) {
private void doExecute(CrtRequestContext executionContext,
CompletableFuture<SdkHttpFullResponse> responseFuture,
CrtStreamHandler streamHandler) {
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);

long acquireStartTime = 0;

if (shouldPublishMetrics) {
// go ahead and get acquireStartTime for the concurrency timer as early as possible,
// so it's as accurate as possible, but only do it in a branch since clock_gettime()
// results in a full sys call barrier (multiple mutexes and a hw interrupt).
acquireStartTime = System.nanoTime();
}

HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(responseFuture, streamHandler);

HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);

boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent();

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody);

long finalAcquireStartTime = acquireStartTime;

Expand All @@ -73,7 +75,9 @@ private void doExecute(CrtRequestContext executionContext, CompletableFuture<Sdk

if (throwable != null) {
Throwable toThrow = wrapCrtException(throwable);
requestFuture.completeExceptionally(toThrow);
responseFuture.completeExceptionally(toThrow);
} else {
streamHandler.setStream(streamBase);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt.internal;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpStreamBase;

/**
* Manages the lifecycle of a CRT HTTP stream, providing thread-safe access to stream operations.
* Shared between the request executor (for writing body data) and the response handler (for
* incrementing the window and releasing/closing the connection).
*/
@SdkInternalApi
public final class CrtStreamHandler {

private final Object streamLock = new Object();
private final CountDownLatch streamLatch = new CountDownLatch(1);
private HttpStreamBase stream;
private boolean streamClosed;

/**
* Sets the stream. Called once when the stream is acquired from the connection pool.
*/
public void setStream(HttpStreamBase stream) {
this.stream = stream;
streamLatch.countDown();
}

/**
* Blocks until the stream has been acquired.
*/
public void waitForStream() {
try {
streamLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for stream", e);
}
}

/**
* Write data to the stream. The caller must ensure the stream is ready (via {@link #waitForStream()})
* before calling this method.
*/
public CompletableFuture<Void> writeData(byte[] data, boolean endStream) {
if (streamLatch.getCount() != 0) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(
new IllegalStateException("writeData called before stream is ready. Call waitForStream() first."));
return future;
}
synchronized (streamLock) {
if (streamClosed) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(
new IOException("Stream is already closed, cannot write data."));
return future;
}
return stream.writeData(data, endStream);
}
}

public void incrementWindow(int windowSize) {
if (streamLatch.getCount() != 0) {
throw new IllegalStateException("incrementWindow called before stream is ready.");
}
synchronized (streamLock) {
if (!streamClosed) {
stream.incrementWindow(windowSize);
}
}
}

/**
* Release the connection back to the pool so that it may be reused. This should be called when the request
* completes successfully and the response has been fully consumed.
*/
public void releaseConnection() {
synchronized (streamLock) {
if (!streamClosed && stream != null) {
streamClosed = true;
stream.close();
}
}
}

/**
* Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the
* connection pool. This should be called on error paths or when the stream is aborted before the response is
* fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract.
*/
public void closeConnection() {
synchronized (streamLock) {
if (!streamClosed && stream != null) {
streamClosed = true;
stream.cancel();
stream.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,7 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) {
HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest));

String finalEncodedPath = encodedPath + encodedQueryString;
return sdkExecuteRequest.contentStreamProvider()
.map(provider -> new HttpRequest(method,
finalEncodedPath,
crtHeaderArray,
new CrtRequestInputStreamAdapter(provider)))
.orElse(new HttpRequest(method,
finalEncodedPath,
crtHeaderArray, null));
return new HttpRequest(method, finalEncodedPath, crtHeaderArray, null);
}

private static HttpHeader[] asArray(List<HttpHeader> crtHeaderList) {
Expand Down

This file was deleted.

Loading
Loading