Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT-based HTTP Client",
"contributor": "",
"description": "Fixed an issue where AwsCrtHttpClient (sync) could deadlock when a request body was sourced from an InputStream that depends on the same CRT event loop, for example when piping a GetObject ResponseInputStream into a PutObject body. The InputStream read now happens on the caller thread instead of the CRT event-loop thread."
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,15 @@

<Class name="~software\.amazon\.awssdk\.messagemanager\.sns\.internal\.SnsHostProvider" />

<!-- BodyChunkPipe is the producer/consumer pipe for the sync CRT client; producer-side
acquireForFill is intentionally blocking on back-pressure and only ever runs on the
caller (sync) thread, never on the CRT event loop. -->
<Class name="~software\.amazon\.awssdk\.http\.crt\.internal\.request\.BodyChunkPipe" />

<!-- CrtHttpRequest.waitForStreamAcquired blocks the caller (sync) thread on the stream
acquisition future with a hard timeout; never runs on the CRT event loop. -->
<Class name="~software\.amazon\.awssdk\.http\.crt\.AwsCrtHttpClient\$CrtHttpRequest" />

<!-- test modules are allowed to make blocking call as parts of their testing -->
<Class name="~.*testutils.*" />
<Class name="~.*s3benchmarks.*" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
Expand All @@ -35,6 +39,7 @@
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CompletableFutureUtils;

Expand Down Expand Up @@ -102,13 +107,15 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
.streamManager(streamManager)
.readBufferSize(this.readBufferSize)
.request(request)
.connectionAcquisitionTimeoutMillis(this.connectionAcquisitionTimeout)
.build();
return new CrtHttpRequest(context);
}

private static final class CrtHttpRequest implements ExecutableHttpRequest {
private final CrtRequestContext context;
private volatile CompletableFuture<SdkHttpFullResponse> responseFuture;
private volatile SyncRequestBodyPump pump;

private CrtHttpRequest(CrtRequestContext context) {
this.context = context;
Expand All @@ -119,7 +126,37 @@ public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();

try {
responseFuture = new CrtRequestExecutor().execute(context);
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
responseFuture = result.responseFuture();
pump = result.pump();

// Wake a parked producer when CRT signals request failure via responseFuture so the
// pump's blocked acquireForFill() returns instead of holding the caller thread.
if (pump != null) {
SyncRequestBodyPump pumpRef = pump;
responseFuture.whenComplete((r, t) -> {
if (t != null) {
pumpRef.abort();
}
});
}

boolean streamAcquired = waitForStreamAcquired(result.streamFuture(),
context.connectionAcquisitionTimeoutMillis());

if (pump != null) {
if (streamAcquired) {
try {
pump.pump();
} catch (IOException ioe) {
responseFuture.completeExceptionally(ioe);
throw ioe;
}
} else {
pump.abort();
}
}

SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
builder.response(response);
builder.responseBody(response.content().orElse(null));
Expand All @@ -128,13 +165,17 @@ public HttpExecuteResponse call() throws IOException {
Throwable cause = e.getCause();

// Complete the future exceptionally to trigger connection cleanup in the response handler.
// Handles thread-interrupt case where joinInterruptibly throws due to
// InterruptedException. Without this, the
// Ensures that closeConnection() is invoked to prevent leaking the connection from the pool.
// Handles the thread-interrupt case where joinInterruptibly throws due to
// InterruptedException, ensuring closeConnection() is invoked to prevent leaking the
// connection from the pool.
if (responseFuture != null) {
responseFuture.completeExceptionally(cause != null ? cause : e);
}

if (pump != null) {
pump.abort();
}

if (cause instanceof IOException) {
throw (IOException) cause;
}
Expand All @@ -156,6 +197,26 @@ public void abort() {
if (responseFuture != null) {
responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
if (pump != null) {
pump.abort();
}
}

private boolean waitForStreamAcquired(CompletableFuture<HttpStreamBase> streamFuture, long timeoutMillis) {
if (streamFuture == null) {
return false;
}
// The eventual exception is propagated by the executor's streamFuture.whenComplete via
// requestFuture.completeExceptionally; here we only decide whether to run the body pump.
try {
streamFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (TimeoutException | ExecutionException e) {
return false;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {

protected final long readBufferSize;
protected final Protocol protocol;
protected final long connectionAcquisitionTimeout;
private final Map<URI, HttpStreamManager> connectionPools = new ConcurrentHashMap<>();
private final LinkedList<CrtResource> ownedSubResources = new LinkedList<>();
private final ClientBootstrap bootstrap;
Expand All @@ -70,7 +71,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private final HttpMonitoringOptions monitoringOptions;
private final long maxConnectionIdleInMilliseconds;
private final int maxStreamsPerEndpoint;
private final long connectionAcquisitionTimeout;
private final TlsContextOptions tlsContextOptions;
private boolean isClosed = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ private void doExecute(CrtAsyncRequestContext executionContext,
long finalAcquireStartTime = acquireStartTime;

streamFuture.whenComplete((stream, throwable) -> {
crtResponseHandler.onAcquireStream(stream);
if (throwable == null) {
crtResponseHandler.onAcquireStream(stream);
}
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ public final class CrtRequestContext {
private final long readBufferSize;
private final HttpStreamManager streamManager;
private final MetricCollector metricCollector;
private final long connectionAcquisitionTimeoutMillis;

private CrtRequestContext(Builder builder) {
this.request = builder.request;
this.readBufferSize = builder.readBufferSize;
this.streamManager = builder.streamManager;
this.metricCollector = request.metricCollector().orElse(null);
this.connectionAcquisitionTimeoutMillis = builder.connectionAcquisitionTimeoutMillis;
}

public static Builder builder() {
Expand All @@ -54,10 +56,15 @@ public MetricCollector metricCollector() {
return metricCollector;
}

public long connectionAcquisitionTimeoutMillis() {
return connectionAcquisitionTimeoutMillis;
}

public static final class Builder {
private HttpExecuteRequest request;
private long readBufferSize;
private HttpStreamManager streamManager;
private long connectionAcquisitionTimeoutMillis;

private Builder() {
}
Expand All @@ -77,6 +84,11 @@ public Builder streamManager(HttpStreamManager streamManager) {
return this;
}

public Builder connectionAcquisitionTimeoutMillis(long connectionAcquisitionTimeoutMillis) {
this.connectionAcquisitionTimeoutMillis = connectionAcquisitionTimeoutMillis;
return this;
}

public CrtRequestContext build() {
return new CrtRequestContext(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,69 +20,94 @@

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter.SyncCrtRequest;
import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.NoOpMetricCollector;

@SdkInternalApi
public final class CrtRequestExecutor {

public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext) {
public Result execute(CrtRequestContext executionContext) {
CompletableFuture<SdkHttpFullResponse> requestFuture = new CompletableFuture<>();
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);

// get acquireStartTime as early as possible for the concurrency timer, but only when metrics are
// enabled since clock_gettime() is a full sys call barrier (multiple mutexes and a hw interrupt).
long acquireStartTime = shouldPublishMetrics ? System.nanoTime() : 0;

try {
doExecute(executionContext, requestFuture);
InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
SyncCrtRequest syncCrtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(syncCrtRequest.httpRequest(), crtResponseHandler);

// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
crtResponseHandler.closeConnection();
}
});

long finalAcquireStartTime = acquireStartTime;
streamFuture.whenComplete((streamBase, throwable) -> {
if (throwable != null) {
requestFuture.completeExceptionally(wrapCrtException(throwable));

} else {
crtResponseHandler.onAcquireStream(streamBase);
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
}
});

return new Result(requestFuture, syncCrtRequest.pump(), streamFuture);
} catch (Throwable t) {
requestFuture.completeExceptionally(t);
return new Result(requestFuture, null, null);
}

return requestFuture;
}

private void doExecute(CrtRequestContext executionContext, CompletableFuture<SdkHttpFullResponse> requestFuture) {
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);

long acquireStartTime = 0;

if (shouldPublishMetrics) {
// go ahead and get acquireStartTime for the concurrency timer as early as possible,
// so it's as accurate as possible, but only do it in a branch since clock_gettime()
// results in a full sys call barrier (multiple mutexes and a hw interrupt).
acquireStartTime = System.nanoTime();
/**
* Result of {@link #execute(CrtRequestContext)}: bundles the response future with the optional
* caller-thread body pump (null when the request has no body) and the future that completes
* when the CRT stream has been acquired from the connection pool.
*/
public static final class Result {
private final CompletableFuture<SdkHttpFullResponse> responseFuture;
private final SyncRequestBodyPump pump;
private final CompletableFuture<HttpStreamBase> streamFuture;

Result(CompletableFuture<SdkHttpFullResponse> responseFuture,
SyncRequestBodyPump pump,
CompletableFuture<HttpStreamBase> streamFuture) {
this.responseFuture = responseFuture;
this.pump = pump;
this.streamFuture = streamFuture;
}

InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);

HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);

// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
crtResponseHandler.closeConnection();
}
});

long finalAcquireStartTime = acquireStartTime;
public CompletableFuture<SdkHttpFullResponse> responseFuture() {
return responseFuture;
}

streamFuture.whenComplete((streamBase, throwable) -> {
crtResponseHandler.onAcquireStream(streamBase);
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
public SyncRequestBodyPump pump() {
return pump;
}

if (throwable != null) {
Throwable toThrow = wrapCrtException(throwable);
requestFuture.completeExceptionally(toThrow);
}
});
/**
* Future that completes when the CRT stream has been acquired (or acquisition has failed).
* The caller blocks on this before running the body pump so per-request body buffers are
* not allocated while a request is queued on the connection pool.
*/
public CompletableFuture<HttpStreamBase> streamFuture() {
return streamFuture;
}
}
}
Loading
Loading