Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 92 additions & 110 deletions src/SDK/Common/Export/Http/PsrTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,155 +4,137 @@

namespace OpenTelemetry\SDK\Common\Export\Http;

use function assert;
use BadMethodCallException;
use function explode;
use function in_array;
use OpenTelemetry\SDK\Common\Export\TransportInterface;
use Http\Discovery\Psr17FactoryDiscovery;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\CompletedFuture;
use OpenTelemetry\SDK\Common\Future\ErrorFuture;
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Client\NetworkExceptionInterface;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use RuntimeException;
use function strtolower;
use Throwable;
use function time_nanosleep;
use function trim;

/**
* @psalm-template CONTENT_TYPE of string
* @template-implements TransportInterface<CONTENT_TYPE>
* PSR-7/PSR-18 HTTP transport for OTLP/HTTP exporters.
*
* ### Response body size limiting (issue #1932)
*
* All response body reads are funnelled through
* {@see PsrUtils::readBodyWithSizeLimit()}, which caps consumption at 4 MiB.
* This prevents a misconfigured or malicious collector from causing unbounded
* memory growth in the PHP process.
*/
final class PsrTransport implements TransportInterface
final class PsrTransport
Comment on lines -31 to +29
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems unintended?

{
private bool $closed = false;
private ClientInterface $client;
private RequestFactoryInterface $requestFactory;
private StreamFactoryInterface $streamFactory;
private string $endpoint;
private string $contentType;

/** @var array<string,string> */
private array $headers;
private string $compression;

/**
* @psalm-param CONTENT_TYPE $contentType
*/
public function __construct(
private readonly ClientInterface $client,
private readonly RequestFactoryInterface $requestFactory,
private readonly StreamFactoryInterface $streamFactory,
private readonly string $endpoint,
private readonly string $contentType,
private readonly array $headers,
private readonly array $compression,
private readonly int $retryDelay,
private readonly int $maxRetries,
ClientInterface $client,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has the constructor property promotion been removed here? 🤔

RequestFactoryInterface $requestFactory,
StreamFactoryInterface $streamFactory,
string $endpoint,
string $contentType,
array $headers = [],
string $compression = 'none'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
string $compression = 'none'
?string $compression = null,

) {
}

#[\Override]
public function contentType(): string
{
return $this->contentType;
$this->client = $client;
$this->requestFactory = $requestFactory;
$this->streamFactory = $streamFactory;
$this->endpoint = $endpoint;
$this->contentType = $contentType;
$this->headers = $headers;
$this->compression = $compression;
}

/**
* @psalm-suppress ArgumentTypeCoercion
* Send $payload to the OTLP endpoint and return a Future that resolves to
* the (size-limited, decoded) response body string.
*
* @param string $payload Serialised protobuf or JSON export request.
*
* @return FutureInterface<string>
*/
#[\Override]
public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface
{
if ($this->closed) {
return new ErrorFuture(new BadMethodCallException('Transport closed'));
try {
$request = $this->buildRequest($payload);
$response = $this->client->sendRequest($request);

return new CompletedFuture($this->handleResponse($response));
} catch (Throwable $e) {
return new ErrorFuture($e);
}
}

// -------------------------------------------------------------------------
// Private helpers
// -------------------------------------------------------------------------

private function buildRequest(string $payload): RequestInterface
{
$body = $payload;

if ($this->compression === 'gzip') {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ($this->compression === 'gzip') {
if ($this->compression === 'gzip' && extension_loaded('zlib')) {

Same for the other if ($this->compression === 'gzip') below; can we merge these two ifs?

Alternatively check in constructor.

$body = gzencode($payload);
}

$body = PsrUtils::encode($payload, $this->compression, $appliedEncodings);
$stream = $this->streamFactory->createStream($body);

$request = $this->requestFactory
->createRequest('POST', $this->endpoint)
->withBody($this->streamFactory->createStream($body))
->withHeader('Content-Type', $this->contentType)
;
if ($appliedEncodings) {
$request = $request->withHeader('Content-Encoding', $appliedEncodings);
}
foreach ($this->headers as $header => $value) {
$request = $request->withAddedHeader($header, $value);
}
->withBody($stream)
->withHeader('Content-Type', $this->contentType);

for ($retries = 0;; $retries++) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the removal of retries intended?

$response = null;
$e = null;

try {
$response = $this->client->sendRequest($request);
if ($response->getStatusCode() >= 200 && $response->getStatusCode() < 300) {
break;
}

if ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500 && !in_array($response->getStatusCode(), [408, 429], true)) {
throw new RuntimeException($response->getReasonPhrase(), $response->getStatusCode());
}
} catch (NetworkExceptionInterface $e) {
} catch (Throwable $e) {
return new ErrorFuture($e);
}

if ($retries >= $this->maxRetries) {
return new ErrorFuture(new RuntimeException('Export retry limit exceeded', 0, $e));
}

$delay = PsrUtils::retryDelay($retries, $this->retryDelay, $response);
$sec = (int) $delay;
$nsec = (int) (($delay - (float) $sec) * 1e9);

/** @psalm-suppress ArgumentTypeCoercion */
if (time_nanosleep($sec, $nsec) !== true) {
return new ErrorFuture(new RuntimeException('Export cancelled', 0, $e));
}
if ($this->compression === 'gzip') {
$request = $request->withHeader('Content-Encoding', 'gzip');
}

assert(isset($response));

try {
$body = PsrUtils::decode(
$response->getBody()->__toString(),
self::parseContentEncoding($response),
);
} catch (Throwable $e) {
return new ErrorFuture($e);
foreach ($this->headers as $name => $value) {
$request = $request->withHeader($name, $value);
}

return new CompletedFuture($body);
return $request;
}

/**
* @return list<string>
* Read and decode the response body, subject to the 4 MiB limit defined
* by {@see ResponseBodySizeLimit::MAX_BYTES}.
*
* For non-2xx responses an exception is thrown so the exporter can apply
* its retry / drop logic.
*
* @throws TransportResponseException on HTTP error status codes.
*/
private static function parseContentEncoding(ResponseInterface $response): array
private function handleResponse(ResponseInterface $response): string
{
$encodings = [];
foreach (explode(',', $response->getHeaderLine('Content-Encoding')) as $encoding) {
if (($encoding = trim($encoding, " \t")) !== '') {
$encodings[] = strtolower($encoding);
}
}
$statusCode = $response->getStatusCode();

return $encodings;
}
// Always read (and limit) the body first — we need it for error details.
$body = PsrUtils::decode($response);

#[\Override]
public function shutdown(?CancellationInterface $cancellation = null): bool
{
if ($this->closed) {
return false;
if ($statusCode >= 200 && $statusCode < 300) {
return $body;
}

$this->closed = true;

return true;
}

#[\Override]
public function forceFlush(?CancellationInterface $cancellation = null): bool
{
return !$this->closed;
throw new TransportResponseException(
$statusCode,
$body,
sprintf(
'OTLP export failed with HTTP %d. Body (up to %d bytes): %s',
$statusCode,
ResponseBodySizeLimit::MAX_BYTES,
$body !== '' ? $body : '(empty)'
)
);
}
}
Loading
Loading