Skip to main content
Drupal API
User account menu
  • Log in

Breadcrumb

  1. Drupal Core 11.1.x

BatchSpanProcessor.php

Namespace

OpenTelemetry\SDK\Trace\SpanProcessor

File

vendor/open-telemetry/sdk/Trace/SpanProcessor/BatchSpanProcessor.php

View source
<?php

declare (strict_types=1);
namespace OpenTelemetry\SDK\Trace\SpanProcessor;

use function assert;
use function count;
use InvalidArgumentException;
use OpenTelemetry\API\Behavior\LogsMessagesTrait;
use OpenTelemetry\API\Common\Time\ClockInterface;
use OpenTelemetry\API\Metrics\MeterProviderInterface;
use OpenTelemetry\API\Metrics\ObserverInterface;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Context\ContextInterface;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Trace\ReadableSpanInterface;
use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface;
use OpenTelemetry\SDK\Trace\SpanDataInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
use SplQueue;
use function sprintf;
use Throwable;
class BatchSpanProcessor implements SpanProcessorInterface {
    use LogsMessagesTrait;
    public const DEFAULT_SCHEDULE_DELAY = 5000;
    public const DEFAULT_EXPORT_TIMEOUT = 30000;
    public const DEFAULT_MAX_QUEUE_SIZE = 2048;
    public const DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
    private const ATTRIBUTES_PROCESSOR = [
        'processor' => 'batching',
    ];
    private const ATTRIBUTES_QUEUED = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'queued',
    ];
    private const ATTRIBUTES_PENDING = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'pending',
    ];
    private const ATTRIBUTES_PROCESSED = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'processed',
    ];
    private const ATTRIBUTES_DROPPED = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'dropped',
    ];
    private const ATTRIBUTES_FREE = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'free',
    ];
    private int $maxQueueSize;
    private int $scheduledDelayNanos;
    private int $maxExportBatchSize;
    private ContextInterface $exportContext;
    private ?int $nextScheduledRun = null;
    private bool $running = false;
    private int $dropped = 0;
    private int $processed = 0;
    private int $batchId = 0;
    private int $queueSize = 0;
    
    /** @var list<SpanDataInterface> */
    private array $batch = [];
    
    /** @var SplQueue<list<SpanDataInterface>> */
    private SplQueue $queue;
    
    /** @var SplQueue<array{int, string, ?CancellationInterface, bool, ContextInterface}> */
    private SplQueue $flush;
    private bool $closed = false;
    public function __construct(SpanExporterInterface $exporter, ClockInterface $clock, int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE, int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY, int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT, int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE, bool $autoFlush = true, ?MeterProviderInterface $meterProvider = null) {
        if ($maxQueueSize <= 0) {
            throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize));
        }
        if ($scheduledDelayMillis <= 0) {
            throw new InvalidArgumentException(sprintf('Scheduled delay (%d) must be greater than zero', $scheduledDelayMillis));
        }
        if ($exportTimeoutMillis <= 0) {
            throw new InvalidArgumentException(sprintf('Export timeout (%d) must be greater than zero', $exportTimeoutMillis));
        }
        if ($maxExportBatchSize <= 0) {
            throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be greater than zero', $maxExportBatchSize));
        }
        if ($maxExportBatchSize > $maxQueueSize) {
            throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be less than or equal to maximum queue size (%d)', $maxExportBatchSize, $maxQueueSize));
        }
        $this->maxQueueSize = $maxQueueSize;
        $this->scheduledDelayNanos = $scheduledDelayMillis * 1000000;
        $this->maxExportBatchSize = $maxExportBatchSize;
        $this->exportContext = Context::getCurrent();
        $this->queue = new SplQueue();
        $this->flush = new SplQueue();
        if ($meterProvider === null) {
            return;
        }
        $meter = $meterProvider->getMeter('io.opentelemetry.sdk');
        $meter->createObservableUpDownCounter('otel.trace.span_processor.spans', '{spans}', 'The number of sampled spans received by the span processor')
            ->observe(function (ObserverInterface $observer) : void {
            $queued = $this->queue
                ->count() * $this->maxExportBatchSize + count($this->batch);
            $pending = $this->queueSize - $queued;
            $processed = $this->processed;
            $dropped = $this->dropped;
            $observer->observe($queued, self::ATTRIBUTES_QUEUED);
            $observer->observe($pending, self::ATTRIBUTES_PENDING);
            $observer->observe($processed, self::ATTRIBUTES_PROCESSED);
            $observer->observe($dropped, self::ATTRIBUTES_DROPPED);
        });
        $meter->createObservableUpDownCounter('otel.trace.span_processor.queue.limit', '{spans}', 'The queue size limit')
            ->observe(function (ObserverInterface $observer) : void {
            $observer->observe($this->maxQueueSize, self::ATTRIBUTES_PROCESSOR);
        });
        $meter->createObservableUpDownCounter('otel.trace.span_processor.queue.usage', '{spans}', 'The current queue usage')
            ->observe(function (ObserverInterface $observer) : void {
            $queued = $this->queue
                ->count() * $this->maxExportBatchSize + count($this->batch);
            $pending = $this->queueSize - $queued;
            $free = $this->maxQueueSize - $this->queueSize;
            $observer->observe($queued, self::ATTRIBUTES_QUEUED);
            $observer->observe($pending, self::ATTRIBUTES_PENDING);
            $observer->observe($free, self::ATTRIBUTES_FREE);
        });
    }
    public function onStart(ReadWriteSpanInterface $span, ContextInterface $parentContext) : void {
    }
    public function onEnd(ReadableSpanInterface $span) : void {
        if ($this->closed) {
            return;
        }
        if (!$span->getContext()
            ->isSampled()) {
            return;
        }
        if ($this->queueSize === $this->maxQueueSize) {
            $this->dropped++;
            return;
        }
        $this->queueSize++;
        $this->batch[] = $span->toSpanData();
        $this->nextScheduledRun ??= $this->clock
            ->now() + $this->scheduledDelayNanos;
        if (count($this->batch) === $this->maxExportBatchSize) {
            $this->enqueueBatch();
        }
        if ($this->autoFlush) {
            $this->flush();
        }
    }
    public function forceFlush(?CancellationInterface $cancellation = null) : bool {
        if ($this->closed) {
            return false;
        }
        return $this->flush(__FUNCTION__, $cancellation);
    }
    public function shutdown(?CancellationInterface $cancellation = null) : bool {
        if ($this->closed) {
            return false;
        }
        $this->closed = true;
        return $this->flush(__FUNCTION__, $cancellation);
    }
    public static function builder(SpanExporterInterface $exporter) : BatchSpanProcessorBuilder {
        return new BatchSpanProcessorBuilder($exporter);
    }
    private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null) : bool {
        if ($flushMethod !== null) {
            $flushId = $this->batchId + $this->queue
                ->count() + (int) (bool) $this->batch;
            $this->flush
                ->enqueue([
                $flushId,
                $flushMethod,
                $cancellation,
                !$this->running,
                Context::getCurrent(),
            ]);
        }
        if ($this->running) {
            return false;
        }
        $success = true;
        $exception = null;
        $this->running = true;
        try {
            for (;;) {
                while (!$this->flush
                    ->isEmpty() && $this->flush
                    ->bottom()[0] <= $this->batchId) {
                    [
                        ,
                        $flushMethod,
                        $cancellation,
                        $propagateResult,
                        $context,
                    ] = $this->flush
                        ->dequeue();
                    $scope = $context->activate();
                    try {
                        $result = $this->exporter
                            ->{$flushMethod}($cancellation);
                        if ($propagateResult) {
                            $success = $result;
                        }
                    } catch (Throwable $e) {
                        if ($propagateResult) {
                            $exception = $e;
                        }
                        else {
                            self::logError(sprintf('Unhandled %s error', $flushMethod), [
                                'exception' => $e,
                            ]);
                        }
                    } finally {
                        $scope->detach();
                    }
                }
                if (!$this->shouldFlush()) {
                    break;
                }
                if ($this->queue
                    ->isEmpty()) {
                    $this->enqueueBatch();
                }
                $batchSize = count($this->queue
                    ->bottom());
                $this->batchId++;
                $scope = $this->exportContext
                    ->activate();
                try {
                    $this->exporter
                        ->export($this->queue
                        ->dequeue())
                        ->await();
                } catch (Throwable $e) {
                    self::logError('Unhandled export error', [
                        'exception' => $e,
                    ]);
                } finally {
                    $this->processed += $batchSize;
                    $this->queueSize -= $batchSize;
                    $scope->detach();
                }
            }
        } finally {
            $this->running = false;
        }
        if ($exception !== null) {
            throw $exception;
        }
        return $success;
    }
    private function shouldFlush() : bool {
        return !$this->flush
            ->isEmpty() || $this->autoFlush && !$this->queue
            ->isEmpty() || $this->autoFlush && $this->nextScheduledRun !== null && $this->clock
            ->now() > $this->nextScheduledRun;
    }
    private function enqueueBatch() : void {
        assert($this->batch !== []);
        $this->queue
            ->enqueue($this->batch);
        $this->batch = [];
        $this->nextScheduledRun = null;
    }

}

Classes

Title Deprecated Summary
BatchSpanProcessor

API Navigation

  • Drupal Core 11.1.x
  • Topics
  • Classes
  • Functions
  • Constants
  • Globals
  • Files
  • Namespaces
  • Deprecated
  • Services
RSS feed
Powered by Drupal