Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT-based HTTP Client",
"contributor": "",
"description": "Fixed an issue where AwsCrtHttpClient (sync) could deadlock when a request body was sourced from an InputStream that depends on the same CRT event loop, for example when piping a GetObject ResponseInputStream into a PutObject body. The InputStream read now happens on the caller thread instead of the CRT event-loop thread."
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,11 @@

<Class name="~software\.amazon\.awssdk\.messagemanager\.sns\.internal\.SnsHostProvider" />

<!-- BodyChunkPipe is the producer/consumer pipe for the sync CRT client; producer-side
acquireForFill is intentionally blocking on back-pressure and only ever runs on the
caller (sync) thread, never on the CRT event loop. -->
<Class name="~software\.amazon\.awssdk\.http\.crt\.internal\.request\.BodyChunkPipe" />

<!-- test modules are allowed to make blocking call as parts of their testing -->
<Class name="~.*testutils.*" />
<Class name="~.*s3benchmarks.*" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
Expand All @@ -35,6 +36,7 @@
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CompletableFutureUtils;

Expand Down Expand Up @@ -109,6 +111,7 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
private static final class CrtHttpRequest implements ExecutableHttpRequest {
private final CrtRequestContext context;
private volatile CompletableFuture<SdkHttpFullResponse> responseFuture;
private volatile SyncRequestBodyPump pump;

private CrtHttpRequest(CrtRequestContext context) {
this.context = context;
Expand All @@ -119,7 +122,38 @@ public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();

try {
responseFuture = new CrtRequestExecutor().execute(context);
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
responseFuture = result.responseFuture();
pump = result.pump();

// Abort the pump to unblock a parked producer when CRT signals request failure
// via responseFuture.
if (pump != null) {
SyncRequestBodyPump pumpRef = pump;
responseFuture.whenComplete((r, t) -> {
if (t != null) {
pumpRef.abort();
}
});
}

boolean streamAcquired = waitForStreamAcquired(result.streamFuture());

// No body case (pump == null): CRT writes the request line + headers when the stream
// is acquired and emits no body callbacks. We only need to join responseFuture below.
if (pump != null) {
if (streamAcquired) {
try {
pump.pump();
} catch (IOException ioe) {
responseFuture.completeExceptionally(ioe);
throw ioe;
}
} else {
pump.abort();
}
}

SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
builder.response(response);
builder.responseBody(response.content().orElse(null));
Expand All @@ -128,13 +162,17 @@ public HttpExecuteResponse call() throws IOException {
Throwable cause = e.getCause();

// Complete the future exceptionally to trigger connection cleanup in the response handler.
// Handles thread-interrupt case where joinInterruptibly throws due to
// InterruptedException. Without this, the
// Ensures that closeConnection() is invoked to prevent leaking the connection from the pool.
// Handles the thread-interrupt case where joinInterruptibly throws due to
// InterruptedException, ensuring closeConnection() is invoked to prevent leaking the
// connection from the pool.
if (responseFuture != null) {
responseFuture.completeExceptionally(cause != null ? cause : e);
}

if (pump != null) {
pump.abort();
}

if (cause instanceof IOException) {
throw (IOException) cause;
}
Expand All @@ -156,6 +194,21 @@ public void abort() {
if (responseFuture != null) {
responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
if (pump != null) {
pump.abort();
}
}

private boolean waitForStreamAcquired(CompletableFuture<HttpStreamBase> streamFuture) {
if (streamFuture == null) {
return false;
}
try {
CompletableFutureUtils.joinInterruptibly(streamFuture);
return true;
} catch (CompletionException e) {
return false;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,69 +20,92 @@

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter.SyncCrtRequest;
import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.NoOpMetricCollector;

@SdkInternalApi
public final class CrtRequestExecutor {

public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext) {
public Result execute(CrtRequestContext executionContext) {
CompletableFuture<SdkHttpFullResponse> requestFuture = new CompletableFuture<>();
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);

// get acquireStartTime as early as possible for the concurrency timer, but only when metrics are
// enabled since clock_gettime() is a full sys call barrier (multiple mutexes and a hw interrupt).
long acquireStartTime = shouldPublishMetrics ? System.nanoTime() : 0;

try {
doExecute(executionContext, requestFuture);
InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
SyncCrtRequest syncCrtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(syncCrtRequest.httpRequest(), crtResponseHandler);

// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
crtResponseHandler.closeConnection();
}
});

long finalAcquireStartTime = acquireStartTime;
streamFuture.whenComplete((streamBase, throwable) -> {
crtResponseHandler.onAcquireStream(streamBase);
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
if (throwable != null) {
requestFuture.completeExceptionally(wrapCrtException(throwable));
}
});

return new Result(requestFuture, syncCrtRequest.pump(), streamFuture);
} catch (Throwable t) {
requestFuture.completeExceptionally(t);
return new Result(requestFuture, null, null);
}

return requestFuture;
}

private void doExecute(CrtRequestContext executionContext, CompletableFuture<SdkHttpFullResponse> requestFuture) {
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);

long acquireStartTime = 0;

if (shouldPublishMetrics) {
// go ahead and get acquireStartTime for the concurrency timer as early as possible,
// so it's as accurate as possible, but only do it in a branch since clock_gettime()
// results in a full sys call barrier (multiple mutexes and a hw interrupt).
acquireStartTime = System.nanoTime();
/**
* Result of {@link #execute(CrtRequestContext)}: bundles the response future with the optional
* caller-thread body pump (null when the request has no body) and the future that completes
* when the CRT stream has been acquired from the connection pool.
*/
public static final class Result {
private final CompletableFuture<SdkHttpFullResponse> responseFuture;
private final SyncRequestBodyPump pump;
private final CompletableFuture<HttpStreamBase> streamFuture;

Result(CompletableFuture<SdkHttpFullResponse> responseFuture,
SyncRequestBodyPump pump,
CompletableFuture<HttpStreamBase> streamFuture) {
this.responseFuture = responseFuture;
this.pump = pump;
this.streamFuture = streamFuture;
}

InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);

HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);

// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
crtResponseHandler.closeConnection();
}
});

long finalAcquireStartTime = acquireStartTime;
public CompletableFuture<SdkHttpFullResponse> responseFuture() {
return responseFuture;
}

streamFuture.whenComplete((streamBase, throwable) -> {
crtResponseHandler.onAcquireStream(streamBase);
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
public SyncRequestBodyPump pump() {
return pump;
}

if (throwable != null) {
Throwable toThrow = wrapCrtException(throwable);
requestFuture.completeExceptionally(toThrow);
}
});
/**
* Future that completes when the CRT stream has been acquired (or acquisition has failed).
* The caller blocks on this before running the body pump so per-request body buffers are
* not allocated while a request is queued on the connection pool.
*/
public CompletableFuture<HttpStreamBase> streamFuture() {
return streamFuture;
}
}
}
Loading
Loading