Skip to content

Commit 420f052

Browse files
Bound JdkHttpSender thread pool to prevent DoS via unbounded thread creation (#8276)
1 parent 5f5b054 commit 420f052

2 files changed

Lines changed: 185 additions & 19 deletions

File tree

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.ConcurrentLinkedQueue;
3333
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.RejectedExecutionException;
3435
import java.util.concurrent.SynchronousQueue;
3536
import java.util.concurrent.ThreadLocalRandom;
3637
import java.util.concurrent.ThreadPoolExecutor;
@@ -133,7 +134,7 @@ public final class JdkHttpSender implements HttpSender {
133134
private static ExecutorService newExecutor() {
134135
return new ThreadPoolExecutor(
135136
0,
136-
Integer.MAX_VALUE,
137+
Math.max(Runtime.getRuntime().availableProcessors(), 5),
137138
60,
138139
TimeUnit.SECONDS,
139140
new SynchronousQueue<>(),
@@ -157,24 +158,28 @@ private static HttpClient configureClient(
157158
@Override
158159
public void send(
159160
MessageWriter messageWriter, Consumer<HttpResponse> onResponse, Consumer<Throwable> onError) {
160-
CompletableFuture<HttpResponse> unused =
161-
CompletableFuture.supplyAsync(
162-
() -> {
163-
try {
164-
return sendInternal(messageWriter);
165-
} catch (IOException e) {
166-
throw new UncheckedIOException(e);
167-
}
168-
},
169-
executorService)
170-
.whenComplete(
171-
(httpResponse, throwable) -> {
172-
if (throwable != null) {
173-
onError.accept(throwable);
174-
return;
175-
}
176-
onResponse.accept(httpResponse);
177-
});
161+
try {
162+
CompletableFuture<HttpResponse> unused =
163+
CompletableFuture.supplyAsync(
164+
() -> {
165+
try {
166+
return sendInternal(messageWriter);
167+
} catch (IOException e) {
168+
throw new UncheckedIOException(e);
169+
}
170+
},
171+
executorService)
172+
.whenComplete(
173+
(httpResponse, throwable) -> {
174+
if (throwable != null) {
175+
onError.accept(throwable);
176+
return;
177+
}
178+
onResponse.accept(httpResponse);
179+
});
180+
} catch (RejectedExecutionException e) {
181+
onError.accept(e);
182+
}
178183
}
179184

180185
// Visible for testing

exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,36 @@
99
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
1010
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
1111
import static org.mockito.ArgumentMatchers.any;
12+
import static org.mockito.Mockito.doReturn;
1213
import static org.mockito.Mockito.doThrow;
14+
import static org.mockito.Mockito.mock;
1315
import static org.mockito.Mockito.times;
1416
import static org.mockito.Mockito.verify;
1517
import static org.mockito.Mockito.when;
1618

1719
import io.opentelemetry.sdk.common.CompletableResultCode;
20+
import io.opentelemetry.sdk.common.export.HttpResponse;
1821
import io.opentelemetry.sdk.common.export.MessageWriter;
1922
import io.opentelemetry.sdk.common.export.RetryPolicy;
23+
import java.io.ByteArrayInputStream;
2024
import java.io.IOException;
25+
import java.io.InputStream;
2126
import java.io.OutputStream;
2227
import java.lang.reflect.Method;
2328
import java.net.ConnectException;
2429
import java.net.ServerSocket;
2530
import java.net.URI;
2631
import java.net.http.HttpClient;
2732
import java.net.http.HttpConnectTimeoutException;
33+
import java.net.http.HttpHeaders;
2834
import java.time.Duration;
2935
import java.util.Collections;
36+
import java.util.concurrent.CountDownLatch;
37+
import java.util.concurrent.RejectedExecutionException;
38+
import java.util.concurrent.SynchronousQueue;
39+
import java.util.concurrent.ThreadPoolExecutor;
3040
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicReference;
3142
import javax.net.ssl.SSLException;
3243
import org.assertj.core.api.InstanceOfAssertFactories;
3344
import org.junit.jupiter.api.BeforeEach;
@@ -166,6 +177,38 @@ void sendInternal_NonRetryableException() throws IOException, InterruptedExcepti
166177
verify(mockHttpClient, times(1)).send(any(), any());
167178
}
168179

180+
@Test
181+
void defaultExecutor_isBounded() {
182+
JdkHttpSender defaultSender =
183+
new JdkHttpSender(
184+
URI.create("http://localhost"),
185+
"text/plain",
186+
null,
187+
Duration.ofNanos(1),
188+
Duration.ofSeconds(10),
189+
Collections::emptyMap,
190+
null,
191+
null,
192+
null,
193+
null,
194+
Long.MAX_VALUE);
195+
196+
try {
197+
int expectedMax = Math.max(Runtime.getRuntime().availableProcessors(), 5);
198+
assertThat(defaultSender)
199+
.extracting(
200+
"executorService", as(InstanceOfAssertFactories.type(ThreadPoolExecutor.class)))
201+
.satisfies(
202+
executor -> {
203+
assertThat(executor.getMaximumPoolSize()).isEqualTo(expectedMax);
204+
assertThat(executor.getRejectedExecutionHandler())
205+
.isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
206+
});
207+
} finally {
208+
defaultSender.shutdown();
209+
}
210+
}
211+
169212
@Test
170213
void connectTimeout() {
171214
sender =
@@ -189,6 +232,124 @@ void connectTimeout() {
189232
assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10)));
190233
}
191234

235+
@SuppressWarnings("unchecked")
236+
@Test
237+
void send_successfulResponse_callsOnResponse() throws Exception {
238+
java.net.http.HttpResponse<InputStream> mockJdkResponse =
239+
mock(java.net.http.HttpResponse.class);
240+
when(mockJdkResponse.statusCode()).thenReturn(200);
241+
when(mockJdkResponse.body()).thenReturn(new ByteArrayInputStream(new byte[0]));
242+
when(mockJdkResponse.headers())
243+
.thenReturn(HttpHeaders.of(Collections.emptyMap(), (a, b) -> true));
244+
doReturn(mockJdkResponse).when(mockHttpClient).send(any(), any());
245+
246+
JdkHttpSender testSender =
247+
new JdkHttpSender(
248+
mockHttpClient,
249+
URI.create("http://localhost"),
250+
"text/plain",
251+
null,
252+
Duration.ofSeconds(10),
253+
Collections::emptyMap,
254+
null,
255+
null,
256+
Long.MAX_VALUE);
257+
258+
try {
259+
CountDownLatch latch = new CountDownLatch(1);
260+
AtomicReference<HttpResponse> responseRef = new AtomicReference<>();
261+
AtomicReference<Throwable> errorRef = new AtomicReference<>();
262+
263+
testSender.send(
264+
new NoOpRequestBodyWriter(),
265+
response -> {
266+
responseRef.set(response);
267+
latch.countDown();
268+
},
269+
error -> {
270+
errorRef.set(error);
271+
latch.countDown();
272+
});
273+
274+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
275+
assertThat(responseRef.get()).isNotNull();
276+
assertThat(responseRef.get().getStatusCode()).isEqualTo(200);
277+
assertThat(errorRef.get()).isNull();
278+
} finally {
279+
testSender.shutdown();
280+
}
281+
}
282+
283+
@Test
284+
void send_ioException_callsOnError() throws Exception {
285+
doThrow(new IOException("send failed")).when(mockHttpClient).send(any(), any());
286+
287+
JdkHttpSender testSender =
288+
new JdkHttpSender(
289+
mockHttpClient,
290+
URI.create("http://localhost"),
291+
"text/plain",
292+
null,
293+
Duration.ofSeconds(10),
294+
Collections::emptyMap,
295+
null,
296+
null,
297+
Long.MAX_VALUE);
298+
299+
try {
300+
CountDownLatch latch = new CountDownLatch(1);
301+
AtomicReference<HttpResponse> responseRef = new AtomicReference<>();
302+
AtomicReference<Throwable> errorRef = new AtomicReference<>();
303+
304+
testSender.send(
305+
new NoOpRequestBodyWriter(),
306+
response -> {
307+
responseRef.set(response);
308+
latch.countDown();
309+
},
310+
error -> {
311+
errorRef.set(error);
312+
latch.countDown();
313+
});
314+
315+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
316+
assertThat(errorRef.get()).isNotNull();
317+
assertThat(errorRef.get()).hasRootCauseInstanceOf(IOException.class);
318+
assertThat(errorRef.get()).hasRootCauseMessage("send failed");
319+
assertThat(responseRef.get()).isNull();
320+
} finally {
321+
testSender.shutdown();
322+
}
323+
}
324+
325+
@Test
326+
void send_rejectedExecution_callsOnError() {
327+
ThreadPoolExecutor executor =
328+
new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
329+
executor.shutdown();
330+
331+
JdkHttpSender testSender =
332+
new JdkHttpSender(
333+
mockHttpClient,
334+
URI.create("http://localhost"),
335+
"text/plain",
336+
null,
337+
Duration.ofSeconds(10),
338+
Collections::emptyMap,
339+
null,
340+
executor,
341+
Long.MAX_VALUE);
342+
343+
AtomicReference<HttpResponse> responseRef = new AtomicReference<>();
344+
AtomicReference<Throwable> errorRef = new AtomicReference<>();
345+
346+
testSender.send(new NoOpRequestBodyWriter(), responseRef::set, errorRef::set);
347+
348+
assertThat(errorRef.get()).isNotNull();
349+
assertThat(errorRef.get()).isInstanceOf(RejectedExecutionException.class);
350+
assertThat(responseRef.get()).isNull();
351+
}
352+
192353
private static class NoOpRequestBodyWriter implements MessageWriter {
193354
@Override
194355
public void writeMessage(OutputStream output) {}

0 commit comments

Comments
 (0)