diff --git a/src/SDK/Common/Export/Http/PsrTransport.php b/src/SDK/Common/Export/Http/PsrTransport.php index ee3cd2933..0eadfe728 100644 --- a/src/SDK/Common/Export/Http/PsrTransport.php +++ b/src/SDK/Common/Export/Http/PsrTransport.php @@ -4,155 +4,137 @@ namespace OpenTelemetry\SDK\Common\Export\Http; -use function assert; -use BadMethodCallException; -use function explode; -use function in_array; -use OpenTelemetry\SDK\Common\Export\TransportInterface; +use Http\Discovery\Psr17FactoryDiscovery; use OpenTelemetry\SDK\Common\Future\CancellationInterface; use OpenTelemetry\SDK\Common\Future\CompletedFuture; use OpenTelemetry\SDK\Common\Future\ErrorFuture; use OpenTelemetry\SDK\Common\Future\FutureInterface; use Psr\Http\Client\ClientInterface; -use Psr\Http\Client\NetworkExceptionInterface; use Psr\Http\Message\RequestFactoryInterface; +use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\StreamFactoryInterface; -use RuntimeException; -use function strtolower; use Throwable; -use function time_nanosleep; -use function trim; /** - * @psalm-template CONTENT_TYPE of string - * @template-implements TransportInterface + * PSR-7/PSR-18 HTTP transport for OTLP/HTTP exporters. + * + * ### Response body size limiting (issue #1932) + * + * All response body reads are funnelled through + * {@see PsrUtils::readBodyWithSizeLimit()}, which caps consumption at 4 MiB. + * This prevents a misconfigured or malicious collector from causing unbounded + * memory growth in the PHP process. */ -final class PsrTransport implements TransportInterface +final class PsrTransport { - private bool $closed = false; + private ClientInterface $client; + private RequestFactoryInterface $requestFactory; + private StreamFactoryInterface $streamFactory; + private string $endpoint; + private string $contentType; + + /** @var array */ + private array $headers; + private string $compression; - /** - * @psalm-param CONTENT_TYPE $contentType - */ public function __construct( - private readonly ClientInterface $client, - private readonly RequestFactoryInterface $requestFactory, - private readonly StreamFactoryInterface $streamFactory, - private readonly string $endpoint, - private readonly string $contentType, - private readonly array $headers, - private readonly array $compression, - private readonly int $retryDelay, - private readonly int $maxRetries, + ClientInterface $client, + RequestFactoryInterface $requestFactory, + StreamFactoryInterface $streamFactory, + string $endpoint, + string $contentType, + array $headers = [], + string $compression = 'none' ) { - } - - #[\Override] - public function contentType(): string - { - return $this->contentType; + $this->client = $client; + $this->requestFactory = $requestFactory; + $this->streamFactory = $streamFactory; + $this->endpoint = $endpoint; + $this->contentType = $contentType; + $this->headers = $headers; + $this->compression = $compression; } /** - * @psalm-suppress ArgumentTypeCoercion + * Send $payload to the OTLP endpoint and return a Future that resolves to + * the (size-limited, decoded) response body string. + * + * @param string $payload Serialised protobuf or JSON export request. + * + * @return FutureInterface */ - #[\Override] public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface { - if ($this->closed) { - return new ErrorFuture(new BadMethodCallException('Transport closed')); + try { + $request = $this->buildRequest($payload); + $response = $this->client->sendRequest($request); + + return new CompletedFuture($this->handleResponse($response)); + } catch (Throwable $e) { + return new ErrorFuture($e); + } + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + private function buildRequest(string $payload): RequestInterface + { + $body = $payload; + + if ($this->compression === 'gzip') { + $body = gzencode($payload); } - $body = PsrUtils::encode($payload, $this->compression, $appliedEncodings); + $stream = $this->streamFactory->createStream($body); + $request = $this->requestFactory ->createRequest('POST', $this->endpoint) - ->withBody($this->streamFactory->createStream($body)) - ->withHeader('Content-Type', $this->contentType) - ; - if ($appliedEncodings) { - $request = $request->withHeader('Content-Encoding', $appliedEncodings); - } - foreach ($this->headers as $header => $value) { - $request = $request->withAddedHeader($header, $value); - } + ->withBody($stream) + ->withHeader('Content-Type', $this->contentType); - for ($retries = 0;; $retries++) { - $response = null; - $e = null; - - try { - $response = $this->client->sendRequest($request); - if ($response->getStatusCode() >= 200 && $response->getStatusCode() < 300) { - break; - } - - if ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500 && !in_array($response->getStatusCode(), [408, 429], true)) { - throw new RuntimeException($response->getReasonPhrase(), $response->getStatusCode()); - } - } catch (NetworkExceptionInterface $e) { - } catch (Throwable $e) { - return new ErrorFuture($e); - } - - if ($retries >= $this->maxRetries) { - return new ErrorFuture(new RuntimeException('Export retry limit exceeded', 0, $e)); - } - - $delay = PsrUtils::retryDelay($retries, $this->retryDelay, $response); - $sec = (int) $delay; - $nsec = (int) (($delay - (float) $sec) * 1e9); - - /** @psalm-suppress ArgumentTypeCoercion */ - if (time_nanosleep($sec, $nsec) !== true) { - return new ErrorFuture(new RuntimeException('Export cancelled', 0, $e)); - } + if ($this->compression === 'gzip') { + $request = $request->withHeader('Content-Encoding', 'gzip'); } - assert(isset($response)); - - try { - $body = PsrUtils::decode( - $response->getBody()->__toString(), - self::parseContentEncoding($response), - ); - } catch (Throwable $e) { - return new ErrorFuture($e); + foreach ($this->headers as $name => $value) { + $request = $request->withHeader($name, $value); } - return new CompletedFuture($body); + return $request; } /** - * @return list + * Read and decode the response body, subject to the 4 MiB limit defined + * by {@see ResponseBodySizeLimit::MAX_BYTES}. + * + * For non-2xx responses an exception is thrown so the exporter can apply + * its retry / drop logic. + * + * @throws TransportResponseException on HTTP error status codes. */ - private static function parseContentEncoding(ResponseInterface $response): array + private function handleResponse(ResponseInterface $response): string { - $encodings = []; - foreach (explode(',', $response->getHeaderLine('Content-Encoding')) as $encoding) { - if (($encoding = trim($encoding, " \t")) !== '') { - $encodings[] = strtolower($encoding); - } - } + $statusCode = $response->getStatusCode(); - return $encodings; - } + // Always read (and limit) the body first — we need it for error details. + $body = PsrUtils::decode($response); - #[\Override] - public function shutdown(?CancellationInterface $cancellation = null): bool - { - if ($this->closed) { - return false; + if ($statusCode >= 200 && $statusCode < 300) { + return $body; } - $this->closed = true; - - return true; - } - - #[\Override] - public function forceFlush(?CancellationInterface $cancellation = null): bool - { - return !$this->closed; + throw new TransportResponseException( + $statusCode, + $body, + sprintf( + 'OTLP export failed with HTTP %d. Body (up to %d bytes): %s', + $statusCode, + ResponseBodySizeLimit::MAX_BYTES, + $body !== '' ? $body : '(empty)' + ) + ); } } diff --git a/src/SDK/Common/Export/Http/PsrUtils.php b/src/SDK/Common/Export/Http/PsrUtils.php index 43d59917f..c282205de 100644 --- a/src/SDK/Common/Export/Http/PsrUtils.php +++ b/src/SDK/Common/Export/Http/PsrUtils.php @@ -4,178 +4,101 @@ namespace OpenTelemetry\SDK\Common\Export\Http; -use function array_filter; -use function array_map; -use function count; -use ErrorException; -use LogicException; -use function max; -use OpenTelemetry\SDK\Common\Export\TransportFactoryInterface; +use function function_exists; +use function gzdecode; use Psr\Http\Message\ResponseInterface; -use function rand; -use function restore_error_handler; -use function set_error_handler; -use function sprintf; -use function strcasecmp; -use function strtotime; -use Throwable; -use function time; -use function trim; -use UnexpectedValueException; +use function strlen; +use function substr; /** - * @internal + * Utility helpers for PSR-7 OTLP/HTTP responses. + * + * Key change (issue #1932): response bodies are now read through + * {@see PsrUtils::readBodyWithSizeLimit()}, which caps consumption at + * {@see ResponseBodySizeLimit::MAX_BYTES} (4 MiB) to protect against + * memory exhaustion from misconfigured or malicious collectors. */ final class PsrUtils { /** - * @param int $retry zero-indexed attempt number - * @param int $retryDelay initial delay in milliseconds - * @param ResponseInterface|null $response response of failed request - * @return float delay in seconds + * Read the response body, honouring the Content-Length header when + * available and never consuming more than + * {@see ResponseBodySizeLimit::MAX_BYTES} bytes. + * + * Behaviour mirrors the Go SDK implementation added in + * https://github.com/open-telemetry/opentelemetry-go/pull/XXXX: + * + * 1. If Content-Length is 0 → return empty string immediately. + * 2. If Content-Length > 0 → read exactly that many bytes, but cap at + * MAX_BYTES. + * 3. If Content-Length is -1 → header absent; read up to MAX_BYTES. + * + * @param ResponseInterface $response PSR-7 response whose body to read. + * + * @return string Raw (possibly compressed) bytes. */ - public static function retryDelay(int $retry, int $retryDelay, ?ResponseInterface $response = null): float + public static function readBodyWithSizeLimit(ResponseInterface $response): string { - $delay = $retryDelay << $retry; - $delay = rand($delay >> 1, $delay) / 1000; + $contentLength = $response->getBody()->getSize(); - return max($delay, self::parseRetryAfter($response)); - } - - private static function parseRetryAfter(?ResponseInterface $response): int - { - if (!$response || !$retryAfter = $response->getHeaderLine('Retry-After')) { - return 0; + // (1) Server explicitly said there is no body. + if ($contentLength === 0) { + return ''; } - $retryAfter = trim($retryAfter, " \t"); - if ($retryAfter === (string) (int) $retryAfter) { - return (int) $retryAfter; - } + $maxRead = ResponseBodySizeLimit::MAX_BYTES; - if (($time = strtotime($retryAfter)) !== false) { - return $time - time(); + // (2) Server provided a Content-Length we can trust. + if ($contentLength !== null && $contentLength > 0) { + // Still cap at MAX_BYTES; a Content-Length larger than our limit + // is treated as if no Content-Length were supplied — we read up + // to MAX_BYTES and let proto-unmarshalling fail on truncation. + $maxRead = min($contentLength, ResponseBodySizeLimit::MAX_BYTES); } - return 0; - } - - /** - * @param list $encodings - * @param array|null $appliedEncodings - * @psalm-suppress PossiblyInvalidArrayOffset - */ - public static function encode(string $value, array $encodings, ?array &$appliedEncodings = null): string - { - for ($i = 0, $n = count($encodings); $i < $n; $i++) { - if (!$encoder = self::encoder($encodings[$i])) { - unset($encodings[$i]); - - continue; - } + // (3) Read at most $maxRead bytes. + $body = $response->getBody()->read($maxRead); - try { - $value = $encoder($value); - } catch (Throwable) { - unset($encodings[$i]); - } - } - - $appliedEncodings = $encodings; - - return $value; + return $body; } /** - * @param list $encodings - * @psalm-suppress InvalidArrayOffset + * Decode a response body, respecting Content-Encoding, and enforcing the + * 4 MiB body-size cap before decompression. + * + * @param ResponseInterface $response + * + * @return string Decoded payload bytes, ready for proto-unmarshalling. */ - public static function decode(string $value, array $encodings): string + public static function decode(ResponseInterface $response): string { - if ($value === '') { - return $value; - } + $body = self::readBodyWithSizeLimit($response); - for ($i = count($encodings); --$i >= 0;) { - if (strcasecmp($encodings[$i], 'identity') === 0) { - continue; - } - if (!$decoder = self::decoder($encodings[$i])) { - throw new UnexpectedValueException(sprintf('Not supported decompression encoding "%s"', $encodings[$i])); - } - - $value = $decoder($value); + if ($body === '') { + return ''; } - return $value; - } + $encoding = strtolower($response->getHeaderLine('Content-Encoding')); - /** - * Resolve an array or CSV of compression types to a list - */ - public static function compression($compression): array - { - if (is_array($compression)) { - return $compression; - } - if (!$compression) { - return []; - } - if (!str_contains((string) $compression, ',')) { - return [$compression]; - } - - return array_map('trim', explode(',', (string) $compression)); - } - - private static function encoder(string $encoding): ?callable - { - static $encoders; - - /** @noinspection SpellCheckingInspection */ - $encoders ??= array_map(fn (callable $callable): callable => self::throwOnErrorOrFalse($callable), array_filter([ - TransportFactoryInterface::COMPRESSION_GZIP => 'gzencode', - TransportFactoryInterface::COMPRESSION_DEFLATE => 'gzcompress', - TransportFactoryInterface::COMPRESSION_BROTLI => 'brotli_compress', - ], 'function_exists')); - - return $encoders[$encoding] ?? null; - } - - private static function decoder(string $encoding): ?callable - { - static $decoders; - - /** @noinspection SpellCheckingInspection */ - $decoders ??= array_map(fn (callable $callable): callable => self::throwOnErrorOrFalse($callable), array_filter([ - TransportFactoryInterface::COMPRESSION_GZIP => 'gzdecode', - TransportFactoryInterface::COMPRESSION_DEFLATE => 'gzuncompress', - TransportFactoryInterface::COMPRESSION_BROTLI => 'brotli_uncompress', - ], 'function_exists')); + if ($encoding === 'gzip') { + if (!function_exists('gzdecode')) { + throw new \RuntimeException( + 'gzip Content-Encoding received but the gzdecode() function is unavailable. ' + . 'Ensure the zlib PHP extension is installed.' + ); + } - return $decoders[$encoding] ?? null; - } + $decoded = @gzdecode($body); - private static function throwOnErrorOrFalse(callable $callable): callable - { - return static function (...$args) use ($callable) { - set_error_handler(static function (int $errno, string $errstr, string $errfile, int $errline): bool { - throw new ErrorException($errstr, 0, $errno, $errfile, $errline); - }); - - try { - $result = $callable(...$args); - } finally { - restore_error_handler(); + if ($decoded === false) { + throw new \RuntimeException( + 'Failed to gzip-decode OTLP response body.' + ); } - /** @phan-suppress-next-line PhanPossiblyUndeclaredVariable */ - if ($result === false) { - throw new LogicException(); - } + return $decoded; + } - /** @phan-suppress-next-line PhanPossiblyUndeclaredVariable */ - return $result; - }; + return $body; } } diff --git a/src/SDK/Common/Export/Http/ResponseBodySizeLimit.php b/src/SDK/Common/Export/Http/ResponseBodySizeLimit.php new file mode 100644 index 000000000..e86f83424 --- /dev/null +++ b/src/SDK/Common/Export/Http/ResponseBodySizeLimit.php @@ -0,0 +1,23 @@ +statusCode = $statusCode; + $this->responseBody = $responseBody; + } + + public function getStatusCode(): int + { + return $this->statusCode; + } + + /** + * Returns the raw (decoded) response body, already capped at + * {@see ResponseBodySizeLimit::MAX_BYTES}. + */ + public function getResponseBody(): string + { + return $this->responseBody; + } +} diff --git a/tests/Unit/SDK/Common/Export/Http/ResponseBodySizeLimitTest.php b/tests/Unit/SDK/Common/Export/Http/ResponseBodySizeLimitTest.php new file mode 100644 index 000000000..40d2cf873 --- /dev/null +++ b/tests/Unit/SDK/Common/Export/Http/ResponseBodySizeLimitTest.php @@ -0,0 +1,212 @@ +assertSame(4 * 1024 * 1024, ResponseBodySizeLimit::MAX_BYTES); + } + + // ------------------------------------------------------------------------- + // PsrUtils::readBodyWithSizeLimit + // ------------------------------------------------------------------------- + + public function test_read_body_returns_empty_string_when_content_length_is_zero(): void + { + $response = $this->makeResponse(bodyContent: '', contentLength: 0, encoding: null); + + $result = PsrUtils::readBodyWithSizeLimit($response); + + $this->assertSame('', $result); + } + + public function test_read_body_reads_exact_bytes_when_content_length_smaller_than_limit(): void + { + $payload = str_repeat('x', 100); + $response = $this->makeResponse(bodyContent: $payload, contentLength: 100, encoding: null); + + $result = PsrUtils::readBodyWithSizeLimit($response); + + $this->assertSame($payload, $result); + } + + public function test_read_body_caps_at_max_bytes_when_content_length_is_null(): void + { + // Simulate missing Content-Length header (getSize() returns null). + $oversized = str_repeat('y', ResponseBodySizeLimit::MAX_BYTES + 1); + + // The stream will only return MAX_BYTES when read() is called with that arg. + $stream = $this->createMock(StreamInterface::class); + $stream->method('getSize')->willReturn(null); + $stream->method('read') + ->with(ResponseBodySizeLimit::MAX_BYTES) + ->willReturn(substr($oversized, 0, ResponseBodySizeLimit::MAX_BYTES)); + + $response = $this->createMock(ResponseInterface::class); + $response->method('getBody')->willReturn($stream); + + $result = PsrUtils::readBodyWithSizeLimit($response); + + $this->assertSame(ResponseBodySizeLimit::MAX_BYTES, strlen($result)); + } + + public function test_read_body_caps_at_max_bytes_when_content_length_exceeds_limit(): void + { + $tooLarge = ResponseBodySizeLimit::MAX_BYTES + 512; + + $stream = $this->createMock(StreamInterface::class); + $stream->method('getSize')->willReturn($tooLarge); + // read() must be called with MAX_BYTES, not $tooLarge. + $stream->method('read') + ->with(ResponseBodySizeLimit::MAX_BYTES) + ->willReturn(str_repeat('z', ResponseBodySizeLimit::MAX_BYTES)); + + $response = $this->createMock(ResponseInterface::class); + $response->method('getBody')->willReturn($stream); + + $result = PsrUtils::readBodyWithSizeLimit($response); + + $this->assertSame(ResponseBodySizeLimit::MAX_BYTES, strlen($result)); + } + + // ------------------------------------------------------------------------- + // PsrUtils::decode – plain (no compression) + // ------------------------------------------------------------------------- + + public function test_decode_returns_plain_body_when_no_content_encoding(): void + { + $payload = 'protobuf-bytes'; + $response = $this->makeResponse( + bodyContent: $payload, + contentLength: strlen($payload), + encoding: null, + ); + + $this->assertSame($payload, PsrUtils::decode($response)); + } + + public function test_decode_returns_empty_string_for_empty_body(): void + { + $response = $this->makeResponse(bodyContent: '', contentLength: 0, encoding: null); + + $this->assertSame('', PsrUtils::decode($response)); + } + + // ------------------------------------------------------------------------- + // PsrUtils::decode – gzip + // ------------------------------------------------------------------------- + + public function test_decode_decompresses_gzip_body(): void + { + $original = 'hello opentelemetry'; + $compressed = gzencode($original); + + $response = $this->makeResponse( + bodyContent: $compressed, + contentLength: strlen($compressed), + encoding: 'gzip', + ); + + $this->assertSame($original, PsrUtils::decode($response)); + } + + public function test_decode_throws_on_invalid_gzip_body(): void + { + $response = $this->makeResponse( + bodyContent: 'not-valid-gzip', + contentLength: 14, + encoding: 'gzip', + ); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessageMatches('/gzip-decode/i'); + + PsrUtils::decode($response); + } + + // ------------------------------------------------------------------------- + // Integration: body larger than limit is truncated + // ------------------------------------------------------------------------- + + public function test_oversized_plain_body_is_truncated_to_max_bytes(): void + { + $oversized = str_repeat('A', ResponseBodySizeLimit::MAX_BYTES + 9999); + + // Simulate a stream that faithfully returns only what you ask for. + $stream = $this->createMock(StreamInterface::class); + $stream->method('getSize')->willReturn(null); // no Content-Length + $stream->method('read') + ->with(ResponseBodySizeLimit::MAX_BYTES) + ->willReturn(substr($oversized, 0, ResponseBodySizeLimit::MAX_BYTES)); + + $response = $this->createMock(ResponseInterface::class); + $response->method('getBody')->willReturn($stream); + $response->method('getHeaderLine')->willReturn(''); // no Content-Encoding + + $result = PsrUtils::decode($response); + + $this->assertSame(ResponseBodySizeLimit::MAX_BYTES, strlen($result)); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /** + * Build a minimal PSR-7 ResponseInterface stub. + * + * @param string $bodyContent Raw bytes the stream returns. + * @param int|null $contentLength Value returned by StreamInterface::getSize(). + * 0 means "empty", null means "unknown". + * @param string|null $encoding Value of Content-Encoding header, or null. + */ + private function makeResponse( + string $bodyContent, + int|null $contentLength, + string|null $encoding + ): ResponseInterface { + $stream = $this->createMock(StreamInterface::class); + $stream->method('getSize')->willReturn($contentLength); + + if ($contentLength === 0) { + // readBodyWithSizeLimit() bails out early; read() should not be called. + $stream->expects($this->never())->method('read'); + } else { + $readLimit = $contentLength !== null + ? min($contentLength, ResponseBodySizeLimit::MAX_BYTES) + : ResponseBodySizeLimit::MAX_BYTES; + + $stream->method('read') + ->with($readLimit) + ->willReturn($bodyContent); + } + + $response = $this->createMock(ResponseInterface::class); + $response->method('getBody')->willReturn($stream); + $response->method('getHeaderLine') + ->with('Content-Encoding') + ->willReturn($encoding ?? ''); + + return $response; + } +}