Skip to main content
Drupal API
User account menu
  • Log in

Breadcrumb

  1. Drupal Core 11.1.x
  2. BatchSpanProcessor.php

class BatchSpanProcessor

Hierarchy

  • class \OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor implements \OpenTelemetry\SDK\Trace\SpanProcessorInterface uses \OpenTelemetry\API\Behavior\LogsMessagesTrait

Expanded class hierarchy of BatchSpanProcessor

1 file declares its use of BatchSpanProcessor
SpanProcessorFactory.php in vendor/open-telemetry/sdk/Trace/SpanProcessorFactory.php

File

vendor/open-telemetry/sdk/Trace/SpanProcessor/BatchSpanProcessor.php, line 26

Namespace

OpenTelemetry\SDK\Trace\SpanProcessor
View source
class BatchSpanProcessor implements SpanProcessorInterface {
    use LogsMessagesTrait;
    public const DEFAULT_SCHEDULE_DELAY = 5000;
    public const DEFAULT_EXPORT_TIMEOUT = 30000;
    public const DEFAULT_MAX_QUEUE_SIZE = 2048;
    public const DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
    private const ATTRIBUTES_PROCESSOR = [
        'processor' => 'batching',
    ];
    private const ATTRIBUTES_QUEUED = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'queued',
    ];
    private const ATTRIBUTES_PENDING = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'pending',
    ];
    private const ATTRIBUTES_PROCESSED = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'processed',
    ];
    private const ATTRIBUTES_DROPPED = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'dropped',
    ];
    private const ATTRIBUTES_FREE = self::ATTRIBUTES_PROCESSOR + [
        'state' => 'free',
    ];
    private int $maxQueueSize;
    private int $scheduledDelayNanos;
    private int $maxExportBatchSize;
    private ContextInterface $exportContext;
    private ?int $nextScheduledRun = null;
    private bool $running = false;
    private int $dropped = 0;
    private int $processed = 0;
    private int $batchId = 0;
    private int $queueSize = 0;
    
    /** @var list<SpanDataInterface> */
    private array $batch = [];
    
    /** @var SplQueue<list<SpanDataInterface>> */
    private SplQueue $queue;
    
    /** @var SplQueue<array{int, string, ?CancellationInterface, bool, ContextInterface}> */
    private SplQueue $flush;
    private bool $closed = false;
    public function __construct(SpanExporterInterface $exporter, ClockInterface $clock, int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE, int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY, int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT, int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE, bool $autoFlush = true, ?MeterProviderInterface $meterProvider = null) {
        if ($maxQueueSize <= 0) {
            throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize));
        }
        if ($scheduledDelayMillis <= 0) {
            throw new InvalidArgumentException(sprintf('Scheduled delay (%d) must be greater than zero', $scheduledDelayMillis));
        }
        if ($exportTimeoutMillis <= 0) {
            throw new InvalidArgumentException(sprintf('Export timeout (%d) must be greater than zero', $exportTimeoutMillis));
        }
        if ($maxExportBatchSize <= 0) {
            throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be greater than zero', $maxExportBatchSize));
        }
        if ($maxExportBatchSize > $maxQueueSize) {
            throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be less than or equal to maximum queue size (%d)', $maxExportBatchSize, $maxQueueSize));
        }
        $this->maxQueueSize = $maxQueueSize;
        $this->scheduledDelayNanos = $scheduledDelayMillis * 1000000;
        $this->maxExportBatchSize = $maxExportBatchSize;
        $this->exportContext = Context::getCurrent();
        $this->queue = new SplQueue();
        $this->flush = new SplQueue();
        if ($meterProvider === null) {
            return;
        }
        $meter = $meterProvider->getMeter('io.opentelemetry.sdk');
        $meter->createObservableUpDownCounter('otel.trace.span_processor.spans', '{spans}', 'The number of sampled spans received by the span processor')
            ->observe(function (ObserverInterface $observer) : void {
            $queued = $this->queue
                ->count() * $this->maxExportBatchSize + count($this->batch);
            $pending = $this->queueSize - $queued;
            $processed = $this->processed;
            $dropped = $this->dropped;
            $observer->observe($queued, self::ATTRIBUTES_QUEUED);
            $observer->observe($pending, self::ATTRIBUTES_PENDING);
            $observer->observe($processed, self::ATTRIBUTES_PROCESSED);
            $observer->observe($dropped, self::ATTRIBUTES_DROPPED);
        });
        $meter->createObservableUpDownCounter('otel.trace.span_processor.queue.limit', '{spans}', 'The queue size limit')
            ->observe(function (ObserverInterface $observer) : void {
            $observer->observe($this->maxQueueSize, self::ATTRIBUTES_PROCESSOR);
        });
        $meter->createObservableUpDownCounter('otel.trace.span_processor.queue.usage', '{spans}', 'The current queue usage')
            ->observe(function (ObserverInterface $observer) : void {
            $queued = $this->queue
                ->count() * $this->maxExportBatchSize + count($this->batch);
            $pending = $this->queueSize - $queued;
            $free = $this->maxQueueSize - $this->queueSize;
            $observer->observe($queued, self::ATTRIBUTES_QUEUED);
            $observer->observe($pending, self::ATTRIBUTES_PENDING);
            $observer->observe($free, self::ATTRIBUTES_FREE);
        });
    }
    public function onStart(ReadWriteSpanInterface $span, ContextInterface $parentContext) : void {
    }
    public function onEnd(ReadableSpanInterface $span) : void {
        if ($this->closed) {
            return;
        }
        if (!$span->getContext()
            ->isSampled()) {
            return;
        }
        if ($this->queueSize === $this->maxQueueSize) {
            $this->dropped++;
            return;
        }
        $this->queueSize++;
        $this->batch[] = $span->toSpanData();
        $this->nextScheduledRun ??= $this->clock
            ->now() + $this->scheduledDelayNanos;
        if (count($this->batch) === $this->maxExportBatchSize) {
            $this->enqueueBatch();
        }
        if ($this->autoFlush) {
            $this->flush();
        }
    }
    public function forceFlush(?CancellationInterface $cancellation = null) : bool {
        if ($this->closed) {
            return false;
        }
        return $this->flush(__FUNCTION__, $cancellation);
    }
    public function shutdown(?CancellationInterface $cancellation = null) : bool {
        if ($this->closed) {
            return false;
        }
        $this->closed = true;
        return $this->flush(__FUNCTION__, $cancellation);
    }
    public static function builder(SpanExporterInterface $exporter) : BatchSpanProcessorBuilder {
        return new BatchSpanProcessorBuilder($exporter);
    }
    private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null) : bool {
        if ($flushMethod !== null) {
            $flushId = $this->batchId + $this->queue
                ->count() + (int) (bool) $this->batch;
            $this->flush
                ->enqueue([
                $flushId,
                $flushMethod,
                $cancellation,
                !$this->running,
                Context::getCurrent(),
            ]);
        }
        if ($this->running) {
            return false;
        }
        $success = true;
        $exception = null;
        $this->running = true;
        try {
            for (;;) {
                while (!$this->flush
                    ->isEmpty() && $this->flush
                    ->bottom()[0] <= $this->batchId) {
                    [
                        ,
                        $flushMethod,
                        $cancellation,
                        $propagateResult,
                        $context,
                    ] = $this->flush
                        ->dequeue();
                    $scope = $context->activate();
                    try {
                        $result = $this->exporter
                            ->{$flushMethod}($cancellation);
                        if ($propagateResult) {
                            $success = $result;
                        }
                    } catch (Throwable $e) {
                        if ($propagateResult) {
                            $exception = $e;
                        }
                        else {
                            self::logError(sprintf('Unhandled %s error', $flushMethod), [
                                'exception' => $e,
                            ]);
                        }
                    } finally {
                        $scope->detach();
                    }
                }
                if (!$this->shouldFlush()) {
                    break;
                }
                if ($this->queue
                    ->isEmpty()) {
                    $this->enqueueBatch();
                }
                $batchSize = count($this->queue
                    ->bottom());
                $this->batchId++;
                $scope = $this->exportContext
                    ->activate();
                try {
                    $this->exporter
                        ->export($this->queue
                        ->dequeue())
                        ->await();
                } catch (Throwable $e) {
                    self::logError('Unhandled export error', [
                        'exception' => $e,
                    ]);
                } finally {
                    $this->processed += $batchSize;
                    $this->queueSize -= $batchSize;
                    $scope->detach();
                }
            }
        } finally {
            $this->running = false;
        }
        if ($exception !== null) {
            throw $exception;
        }
        return $success;
    }
    private function shouldFlush() : bool {
        return !$this->flush
            ->isEmpty() || $this->autoFlush && !$this->queue
            ->isEmpty() || $this->autoFlush && $this->nextScheduledRun !== null && $this->clock
            ->now() > $this->nextScheduledRun;
    }
    private function enqueueBatch() : void {
        assert($this->batch !== []);
        $this->queue
            ->enqueue($this->batch);
        $this->batch = [];
        $this->nextScheduledRun = null;
    }

}

Members

Title Sort descending Modifiers Object type Summary Overriden Title
BatchSpanProcessor::$batch private property @var list&lt;SpanDataInterface&gt;
BatchSpanProcessor::$batchId private property
BatchSpanProcessor::$closed private property
BatchSpanProcessor::$dropped private property
BatchSpanProcessor::$exportContext private property
BatchSpanProcessor::$flush private property @var SplQueue&lt;array{int, string, ?CancellationInterface, bool, ContextInterface}&gt;
BatchSpanProcessor::$maxExportBatchSize private property
BatchSpanProcessor::$maxQueueSize private property
BatchSpanProcessor::$nextScheduledRun private property
BatchSpanProcessor::$processed private property
BatchSpanProcessor::$queue private property @var SplQueue&lt;list&lt;SpanDataInterface&gt;&gt;
BatchSpanProcessor::$queueSize private property
BatchSpanProcessor::$running private property
BatchSpanProcessor::$scheduledDelayNanos private property
BatchSpanProcessor::ATTRIBUTES_DROPPED private constant
BatchSpanProcessor::ATTRIBUTES_FREE private constant
BatchSpanProcessor::ATTRIBUTES_PENDING private constant
BatchSpanProcessor::ATTRIBUTES_PROCESSED private constant
BatchSpanProcessor::ATTRIBUTES_PROCESSOR private constant
BatchSpanProcessor::ATTRIBUTES_QUEUED private constant
BatchSpanProcessor::builder public static function
BatchSpanProcessor::DEFAULT_EXPORT_TIMEOUT public constant
BatchSpanProcessor::DEFAULT_MAX_EXPORT_BATCH_SIZE public constant
BatchSpanProcessor::DEFAULT_MAX_QUEUE_SIZE public constant
BatchSpanProcessor::DEFAULT_SCHEDULE_DELAY public constant
BatchSpanProcessor::enqueueBatch private function
BatchSpanProcessor::flush private function
BatchSpanProcessor::forceFlush public function Export all ended spans to the configured Exporter that have not yet been exported.
Returns `true` if the flush was successful, otherwise `false`.
Overrides SpanProcessorInterface::forceFlush
BatchSpanProcessor::onEnd public function Overrides SpanProcessorInterface::onEnd
BatchSpanProcessor::onStart public function Overrides SpanProcessorInterface::onStart
BatchSpanProcessor::shouldFlush private function
BatchSpanProcessor::shutdown public function Cleanup; after shutdown, calling onStart, onEnd, or forceFlush is invalid
Returns `false` is the processor is already shutdown, otherwise `true`.
Overrides SpanProcessorInterface::shutdown
BatchSpanProcessor::__construct public function
LogsMessagesTrait::doLog private static function
LogsMessagesTrait::logDebug protected static function
LogsMessagesTrait::logError protected static function
LogsMessagesTrait::logInfo protected static function
LogsMessagesTrait::logNotice protected static function
LogsMessagesTrait::logWarning protected static function
LogsMessagesTrait::shouldLog private static function

API Navigation

  • Drupal Core 11.1.x
  • Topics
  • Classes
  • Functions
  • Constants
  • Globals
  • Files
  • Namespaces
  • Deprecated
  • Services
RSS feed
Powered by Drupal