class StreamFactory
@internal
Hierarchy
- class \OpenTelemetry\SDK\Metrics\MetricFactory\StreamFactory implements \OpenTelemetry\SDK\Metrics\MetricFactoryInterface
Expanded class hierarchy of StreamFactory
1 file declares its use of StreamFactory
- MeterProvider.php in vendor/
open-telemetry/ sdk/ Metrics/ MeterProvider.php
File
-
vendor/
open-telemetry/ sdk/ Metrics/ MetricFactory/ StreamFactory.php, line 37
Namespace
OpenTelemetry\SDK\Metrics\MetricFactoryView source
final class StreamFactory implements MetricFactoryInterface {
public function createAsynchronousObserver(MetricRegistryInterface $registry, ResourceInfo $resource, InstrumentationScopeInterface $instrumentationScope, Instrument $instrument, int $timestamp, iterable $views) : array {
$streams = [];
$dedup = [];
foreach ($views as [
$view,
$registration,
]) {
if ($view->aggregation === null) {
continue;
}
$dedupId = $this->streamId($view->aggregation, $view->attributeKeys);
if (($streamId = $dedup[$dedupId] ?? null) === null) {
$stream = new AsynchronousMetricStream($view->aggregation, $timestamp);
$streamId = $registry->registerAsynchronousStream($instrument, $stream, new MetricAggregatorFactory($this->attributeProcessor($view->attributeKeys), $view->aggregation));
$streams[$streamId] = $stream;
$dedup[$dedupId] = $streamId;
}
$this->registerSource($view, $instrument, $instrumentationScope, $resource, $streams[$streamId], $registry, $registration, $streamId);
}
return array_keys($streams);
}
public function createSynchronousWriter(MetricRegistryInterface $registry, ResourceInfo $resource, InstrumentationScopeInterface $instrumentationScope, Instrument $instrument, int $timestamp, iterable $views, ?ExemplarFilterInterface $exemplarFilter = null) : array {
$streams = [];
$dedup = [];
foreach ($views as [
$view,
$registration,
]) {
if ($view->aggregation === null) {
continue;
}
$dedupId = $this->streamId($view->aggregation, $view->attributeKeys);
if (($streamId = $dedup[$dedupId] ?? null) === null) {
$stream = new SynchronousMetricStream($view->aggregation, $timestamp);
$streamId = $registry->registerSynchronousStream($instrument, $stream, new MetricAggregator($this->attributeProcessor($view->attributeKeys), $view->aggregation, $this->createExemplarReservoir($view->aggregation, $exemplarFilter)));
$streams[$streamId] = $stream;
$dedup[$dedupId] = $streamId;
}
$this->registerSource($view, $instrument, $instrumentationScope, $resource, $streams[$streamId], $registry, $registration, $streamId);
}
return array_keys($streams);
}
private function attributeProcessor(?array $attributeKeys) : ?AttributeProcessorInterface {
return $attributeKeys !== null ? new FilteredAttributeProcessor($attributeKeys) : null;
}
private function createExemplarReservoir(AggregationInterface $aggregation, ?ExemplarFilterInterface $exemplarFilter) : ?ExemplarReservoirInterface {
if (!$exemplarFilter) {
return null;
}
if ($aggregation instanceof ExplicitBucketHistogramAggregation && $aggregation->boundaries) {
$exemplarReservoir = new HistogramBucketReservoir($aggregation->boundaries);
}
else {
$exemplarReservoir = new FixedSizeReservoir();
}
return new FilteredReservoir($exemplarReservoir, $exemplarFilter);
}
private function registerSource(ViewProjection $view, Instrument $instrument, InstrumentationScopeInterface $instrumentationScope, ResourceInfo $resource, MetricStreamInterface $stream, MetricCollectorInterface $metricCollector, MetricRegistrationInterface $metricRegistration, int $streamId) : void {
$provider = new StreamMetricSourceProvider($view, $instrument, $instrumentationScope, $resource, $stream, $metricCollector, $streamId);
$metricRegistration->register($provider, $provider);
}
private function streamId(AggregationInterface $aggregation, ?array $attributeKeys) : string {
return $this->trySerialize($aggregation) . serialize($attributeKeys);
}
private function trySerialize(object $object) {
try {
return serialize($object);
} catch (Throwable) {
}
return spl_object_id($object);
}
}
Members
Title Sort descending | Modifiers | Object type | Summary | Overriden Title |
---|---|---|---|---|
StreamFactory::attributeProcessor | private | function | ||
StreamFactory::createAsynchronousObserver | public | function | Overrides MetricFactoryInterface::createAsynchronousObserver | |
StreamFactory::createExemplarReservoir | private | function | ||
StreamFactory::createSynchronousWriter | public | function | Overrides MetricFactoryInterface::createSynchronousWriter | |
StreamFactory::registerSource | private | function | ||
StreamFactory::streamId | private | function | ||
StreamFactory::trySerialize | private | function |