Skip to main content
Drupal API
User account menu
  • Log in

Breadcrumb

  1. Drupal Core 11.1.x
  2. AbstractDriver.php

class AbstractDriver

Event loop driver which implements all basic operations to allow interoperability.

Callbacks (enabled or new callbacks) MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.

All registered callbacks MUST NOT be called from a file with strict types enabled (`declare(strict_types=1)`).

@internal

Hierarchy

  • class \Revolt\EventLoop\Internal\AbstractDriver implements \Revolt\EventLoop\Driver

Expanded class hierarchy of AbstractDriver

5 files declare their use of AbstractDriver
EvDriver.php in vendor/revolt/event-loop/src/EventLoop/Driver/EvDriver.php
EventDriver.php in vendor/revolt/event-loop/src/EventLoop/Driver/EventDriver.php
EventLoop.php in vendor/revolt/event-loop/src/EventLoop.php
StreamSelectDriver.php in vendor/revolt/event-loop/src/EventLoop/Driver/StreamSelectDriver.php
UvDriver.php in vendor/revolt/event-loop/src/EventLoop/Driver/UvDriver.php

File

vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php, line 24

Namespace

Revolt\EventLoop\Internal
View source
abstract class AbstractDriver implements Driver {
    
    /** @var string Next callback identifier. */
    private string $nextId = "a";
    private \Fiber $fiber;
    private \Fiber $callbackFiber;
    private \Closure $errorCallback;
    
    /** @var array<string, DriverCallback> */
    private array $callbacks = [];
    
    /** @var array<string, DriverCallback> */
    private array $enableQueue = [];
    
    /** @var array<string, DriverCallback> */
    private array $enableDeferQueue = [];
    
    /** @var null|\Closure(\Throwable):void */
    private ?\Closure $errorHandler = null;
    
    /** @var null|\Closure():mixed */
    private ?\Closure $interrupt = null;
    private readonly \Closure $interruptCallback;
    private readonly \Closure $queueCallback;
    private readonly \Closure $runCallback;
    private readonly \stdClass $internalSuspensionMarker;
    
    /** @var \SplQueue<array{\Closure, array}> */
    private readonly \SplQueue $microtaskQueue;
    
    /** @var \SplQueue<DriverCallback> */
    private readonly \SplQueue $callbackQueue;
    private bool $idle = false;
    private bool $stopped = false;
    
    /** @var \WeakMap<object, \WeakReference<DriverSuspension>> */
    private \WeakMap $suspensions;
    public function __construct() {
        if (\PHP_VERSION_ID < 80117 || \PHP_VERSION_ID >= 80200 && \PHP_VERSION_ID < 80204) {
            // PHP GC is broken on early 8.1 and 8.2 versions, see https://github.com/php/php-src/issues/10496
            if (!\getenv('REVOLT_DRIVER_SUPPRESS_ISSUE_10496')) {
                throw new \Error('Your version of PHP is affected by serious garbage collector bugs related to fibers. Please upgrade to a newer version of PHP, i.e. >= 8.1.17 or => 8.2.4');
            }
        }
        $this->suspensions = new \WeakMap();
        $this->internalSuspensionMarker = new \stdClass();
        $this->microtaskQueue = new \SplQueue();
        $this->callbackQueue = new \SplQueue();
        $this->createLoopFiber();
        $this->createCallbackFiber();
        $this->createErrorCallback();
        
        /** @psalm-suppress InvalidArgument */
        $this->interruptCallback = $this->setInterrupt(...);
        $this->queueCallback = $this->queue(...);
        $this->runCallback = function () {
            if ($this->fiber
                ->isTerminated()) {
                $this->createLoopFiber();
            }
            return $this->fiber
                ->isStarted() ? $this->fiber
                ->resume() : $this->fiber
                ->start();
        };
    }
    public function run() : void {
        if ($this->fiber
            ->isRunning()) {
            throw new \Error("The event loop is already running");
        }
        if (\Fiber::getCurrent()) {
            throw new \Error(\sprintf("Can't call %s() within a fiber (i.e., outside of {main})", __METHOD__));
        }
        if ($this->fiber
            ->isTerminated()) {
            $this->createLoopFiber();
        }
        
        /** @noinspection PhpUnhandledExceptionInspection */
        $lambda = $this->fiber
            ->isStarted() ? $this->fiber
            ->resume() : $this->fiber
            ->start();
        if ($lambda) {
            $lambda();
            throw new \Error('Interrupt from event loop must throw an exception: ' . ClosureHelper::getDescription($lambda));
        }
    }
    public function stop() : void {
        $this->stopped = true;
    }
    public function isRunning() : bool {
        return $this->fiber
            ->isRunning() || $this->fiber
            ->isSuspended();
    }
    public function queue(\Closure $closure, mixed ...$args) : void {
        $this->microtaskQueue
            ->enqueue([
            $closure,
            $args,
        ]);
    }
    public function defer(\Closure $closure) : string {
        $deferCallback = new DeferCallback($this->nextId++, $closure);
        $this->callbacks[$deferCallback->id] = $deferCallback;
        $this->enableDeferQueue[$deferCallback->id] = $deferCallback;
        return $deferCallback->id;
    }
    public function delay(float $delay, \Closure $closure) : string {
        if ($delay < 0) {
            throw new \Error("Delay must be greater than or equal to zero");
        }
        $timerCallback = new TimerCallback($this->nextId++, $delay, $closure, $this->now() + $delay);
        $this->callbacks[$timerCallback->id] = $timerCallback;
        $this->enableQueue[$timerCallback->id] = $timerCallback;
        return $timerCallback->id;
    }
    public function repeat(float $interval, \Closure $closure) : string {
        if ($interval < 0) {
            throw new \Error("Interval must be greater than or equal to zero");
        }
        $timerCallback = new TimerCallback($this->nextId++, $interval, $closure, $this->now() + $interval, true);
        $this->callbacks[$timerCallback->id] = $timerCallback;
        $this->enableQueue[$timerCallback->id] = $timerCallback;
        return $timerCallback->id;
    }
    public function onReadable(mixed $stream, \Closure $closure) : string {
        $streamCallback = new StreamReadableCallback($this->nextId++, $closure, $stream);
        $this->callbacks[$streamCallback->id] = $streamCallback;
        $this->enableQueue[$streamCallback->id] = $streamCallback;
        return $streamCallback->id;
    }
    public function onWritable($stream, \Closure $closure) : string {
        $streamCallback = new StreamWritableCallback($this->nextId++, $closure, $stream);
        $this->callbacks[$streamCallback->id] = $streamCallback;
        $this->enableQueue[$streamCallback->id] = $streamCallback;
        return $streamCallback->id;
    }
    public function onSignal(int $signal, \Closure $closure) : string {
        $signalCallback = new SignalCallback($this->nextId++, $closure, $signal);
        $this->callbacks[$signalCallback->id] = $signalCallback;
        $this->enableQueue[$signalCallback->id] = $signalCallback;
        return $signalCallback->id;
    }
    public function enable(string $callbackId) : string {
        if (!isset($this->callbacks[$callbackId])) {
            throw InvalidCallbackError::invalidIdentifier($callbackId);
        }
        $callback = $this->callbacks[$callbackId];
        if ($callback->enabled) {
            return $callbackId;
            // Callback already enabled.
        }
        $callback->enabled = true;
        if ($callback instanceof DeferCallback) {
            $this->enableDeferQueue[$callback->id] = $callback;
        }
        elseif ($callback instanceof TimerCallback) {
            $callback->expiration = $this->now() + $callback->interval;
            $this->enableQueue[$callback->id] = $callback;
        }
        else {
            $this->enableQueue[$callback->id] = $callback;
        }
        return $callbackId;
    }
    public function cancel(string $callbackId) : void {
        $this->disable($callbackId);
        unset($this->callbacks[$callbackId]);
    }
    public function disable(string $callbackId) : string {
        if (!isset($this->callbacks[$callbackId])) {
            return $callbackId;
        }
        $callback = $this->callbacks[$callbackId];
        if (!$callback->enabled) {
            return $callbackId;
            // Callback already disabled.
        }
        $callback->enabled = false;
        $callback->invokable = false;
        $id = $callback->id;
        if ($callback instanceof DeferCallback) {
            // Callback was only queued to be enabled.
            unset($this->enableDeferQueue[$id]);
        }
        elseif (isset($this->enableQueue[$id])) {
            // Callback was only queued to be enabled.
            unset($this->enableQueue[$id]);
        }
        else {
            $this->deactivate($callback);
        }
        return $callbackId;
    }
    public function reference(string $callbackId) : string {
        if (!isset($this->callbacks[$callbackId])) {
            throw InvalidCallbackError::invalidIdentifier($callbackId);
        }
        $this->callbacks[$callbackId]->referenced = true;
        return $callbackId;
    }
    public function unreference(string $callbackId) : string {
        if (!isset($this->callbacks[$callbackId])) {
            return $callbackId;
        }
        $this->callbacks[$callbackId]->referenced = false;
        return $callbackId;
    }
    public function getSuspension() : Suspension {
        $fiber = \Fiber::getCurrent();
        // User callbacks are always executed outside the event loop fiber, so this should always be false.
        \assert($fiber !== $this->fiber);
        // Use queue closure in case of {main}, which can be unset by DriverSuspension after an uncaught exception.
        $key = $fiber ?? $this->queueCallback;
        $suspension = ($this->suspensions[$key] ?? null)?->get();
        if ($suspension) {
            return $suspension;
        }
        $suspension = new DriverSuspension($this->runCallback, $this->queueCallback, $this->interruptCallback, $this->suspensions);
        $this->suspensions[$key] = \WeakReference::create($suspension);
        return $suspension;
    }
    public function setErrorHandler(?\Closure $errorHandler) : void {
        $this->errorHandler = $errorHandler;
    }
    public function getErrorHandler() : ?\Closure {
        return $this->errorHandler;
    }
    public function __debugInfo() : array {
        // @codeCoverageIgnoreStart
        return \array_map(fn(DriverCallback $callback) => [
            'type' => $this->getType($callback->id),
            'enabled' => $callback->enabled,
            'referenced' => $callback->referenced,
        ], $this->callbacks);
        // @codeCoverageIgnoreEnd
    }
    public function getIdentifiers() : array {
        return \array_keys($this->callbacks);
    }
    public function getType(string $callbackId) : CallbackType {
        $callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
        return match ($callback::class) {    DeferCallback::class => CallbackType::Defer,
            TimerCallback::class => $callback->repeat ? CallbackType::Repeat : CallbackType::Delay,
            StreamReadableCallback::class => CallbackType::Readable,
            StreamWritableCallback::class => CallbackType::Writable,
            SignalCallback::class => CallbackType::Signal,
        
        };
    }
    public function isEnabled(string $callbackId) : bool {
        $callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
        return $callback->enabled;
    }
    public function isReferenced(string $callbackId) : bool {
        $callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
        return $callback->referenced;
    }
    
    /**
     * Activates (enables) all the given callbacks.
     */
    protected abstract function activate(array $callbacks) : void;
    
    /**
     * Dispatches any pending read/write, timer, and signal events.
     */
    protected abstract function dispatch(bool $blocking) : void;
    
    /**
     * Deactivates (disables) the given callback.
     */
    protected abstract function deactivate(DriverCallback $callback) : void;
    protected final function enqueueCallback(DriverCallback $callback) : void {
        $this->callbackQueue
            ->enqueue($callback);
        $this->idle = false;
    }
    
    /**
     * Invokes the error handler with the given exception.
     *
     * @param \Throwable $exception The exception thrown from an event callback.
     */
    protected final function error(\Closure $closure, \Throwable $exception) : void {
        if ($this->errorHandler === null) {
            // Explicitly override the previous interrupt if it exists in this case, hiding the exception is worse
            $this->interrupt = static fn() => $exception instanceof UncaughtThrowable ? throw $exception : throw UncaughtThrowable::throwingCallback($closure, $exception);
            return;
        }
        $fiber = new \Fiber($this->errorCallback);
        
        /** @noinspection PhpUnhandledExceptionInspection */
        $fiber->start($this->errorHandler, $exception);
    }
    
    /**
     * Returns the current event loop time in second increments.
     *
     * Note this value does not necessarily correlate to wall-clock time, rather the value returned is meant to be used
     * in relative comparisons to prior values returned by this method (intervals, expiration calculations, etc.).
     */
    protected abstract function now() : float;
    private function invokeMicrotasks() : void {
        while (!$this->microtaskQueue
            ->isEmpty()) {
            [
                $callback,
                $args,
            ] = $this->microtaskQueue
                ->dequeue();
            try {
                // Clear $args to allow garbage collection
                $callback(...$args, ...$args = []);
            } catch (\Throwable $exception) {
                $this->error($callback, $exception);
            } finally {
                FiberLocal::clear();
            }
            unset($callback, $args);
            if ($this->interrupt) {
                
                /** @noinspection PhpUnhandledExceptionInspection */
                \Fiber::suspend($this->internalSuspensionMarker);
            }
        }
    }
    
    /**
     * @return bool True if no enabled and referenced callbacks remain in the loop.
     */
    private function isEmpty() : bool {
        foreach ($this->callbacks as $callback) {
            if ($callback->enabled && $callback->referenced) {
                return false;
            }
        }
        return true;
    }
    
    /**
     * Executes a single tick of the event loop.
     */
    private function tick(bool $previousIdle) : void {
        $this->activate($this->enableQueue);
        foreach ($this->enableQueue as $callback) {
            $callback->invokable = true;
        }
        $this->enableQueue = [];
        foreach ($this->enableDeferQueue as $callback) {
            $callback->invokable = true;
            $this->enqueueCallback($callback);
        }
        $this->enableDeferQueue = [];
        $blocking = $previousIdle && !$this->stopped && !$this->isEmpty();
        if ($blocking) {
            $this->invokeCallbacks();
            
            /** @psalm-suppress TypeDoesNotContainType */
            if (!empty($this->enableDeferQueue) || !empty($this->enableQueue)) {
                $blocking = false;
            }
        }
        
        /** @psalm-suppress RedundantCondition */
        $this->dispatch($blocking);
    }
    private function invokeCallbacks() : void {
        while (!$this->microtaskQueue
            ->isEmpty() || !$this->callbackQueue
            ->isEmpty()) {
            
            /** @noinspection PhpUnhandledExceptionInspection */
            $yielded = $this->callbackFiber
                ->isStarted() ? $this->callbackFiber
                ->resume() : $this->callbackFiber
                ->start();
            if ($yielded !== $this->internalSuspensionMarker) {
                $this->createCallbackFiber();
            }
            if ($this->interrupt) {
                $this->invokeInterrupt();
            }
        }
    }
    
    /**
     * @param \Closure():mixed $interrupt
     */
    private function setInterrupt(\Closure $interrupt) : void {
        \assert($this->interrupt === null);
        $this->interrupt = $interrupt;
    }
    private function invokeInterrupt() : void {
        \assert($this->interrupt !== null);
        $interrupt = $this->interrupt;
        $this->interrupt = null;
        
        /** @noinspection PhpUnhandledExceptionInspection */
        \Fiber::suspend($interrupt);
    }
    private function createLoopFiber() : void {
        $this->fiber = new \Fiber(function () : void {
            $this->stopped = false;
            // Invoke microtasks if we have some
            $this->invokeCallbacks();
            
            /** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */
            while (!$this->stopped) {
                if ($this->interrupt) {
                    $this->invokeInterrupt();
                }
                if ($this->isEmpty()) {
                    return;
                }
                $previousIdle = $this->idle;
                $this->idle = true;
                $this->tick($previousIdle);
                $this->invokeCallbacks();
            }
        });
    }
    private function createCallbackFiber() : void {
        $this->callbackFiber = new \Fiber(function () : void {
            do {
                $this->invokeMicrotasks();
                while (!$this->callbackQueue
                    ->isEmpty()) {
                    
                    /** @var DriverCallback $callback */
                    $callback = $this->callbackQueue
                        ->dequeue();
                    if (!isset($this->callbacks[$callback->id]) || !$callback->invokable) {
                        unset($callback);
                        continue;
                    }
                    if ($callback instanceof DeferCallback) {
                        $this->cancel($callback->id);
                    }
                    elseif ($callback instanceof TimerCallback) {
                        if (!$callback->repeat) {
                            $this->cancel($callback->id);
                        }
                        else {
                            // Disable and re-enable, so it's not executed repeatedly in the same tick
                            // See https://github.com/amphp/amp/issues/131
                            $this->disable($callback->id);
                            $this->enable($callback->id);
                        }
                    }
                    try {
                        $result = match (true) {    $callback instanceof StreamCallback => ($callback->closure)($callback->id, $callback->stream),
                            $callback instanceof SignalCallback => ($callback->closure)($callback->id, $callback->signal),
                            default => ($callback->closure)($callback->id),
                        
                        };
                        if ($result !== null) {
                            throw InvalidCallbackError::nonNullReturn($callback->id, $callback->closure);
                        }
                    } catch (\Throwable $exception) {
                        $this->error($callback->closure, $exception);
                    } finally {
                        FiberLocal::clear();
                    }
                    unset($callback);
                    if ($this->interrupt) {
                        
                        /** @noinspection PhpUnhandledExceptionInspection */
                        \Fiber::suspend($this->internalSuspensionMarker);
                    }
                    $this->invokeMicrotasks();
                }
                
                /** @noinspection PhpUnhandledExceptionInspection */
                \Fiber::suspend($this->internalSuspensionMarker);
            } while (true);
        });
    }
    private function createErrorCallback() : void {
        $this->errorCallback = function (\Closure $errorHandler, \Throwable $exception) : void {
            try {
                $errorHandler($exception);
            } catch (\Throwable $exception) {
                $this->interrupt = static fn() => $exception instanceof UncaughtThrowable ? throw $exception : throw UncaughtThrowable::throwingErrorHandler($errorHandler, $exception);
            }
        };
    }

}

Members

Title Sort descending Modifiers Object type Summary Overriden Title Overrides
AbstractDriver::$callbackFiber private property
AbstractDriver::$callbackQueue private property @var \SplQueue&lt;DriverCallback&gt;
AbstractDriver::$callbacks private property @var array&lt;string, DriverCallback&gt;
AbstractDriver::$enableDeferQueue private property @var array&lt;string, DriverCallback&gt;
AbstractDriver::$enableQueue private property @var array&lt;string, DriverCallback&gt;
AbstractDriver::$errorCallback private property
AbstractDriver::$errorHandler private property @var null|\Closure(\Throwable):void
AbstractDriver::$fiber private property
AbstractDriver::$idle private property
AbstractDriver::$internalSuspensionMarker private property
AbstractDriver::$interrupt private property @var null|\Closure():mixed
AbstractDriver::$interruptCallback private property
AbstractDriver::$microtaskQueue private property @var \SplQueue&lt;array{\Closure, array}&gt;
AbstractDriver::$nextId private property @var string Next callback identifier.
AbstractDriver::$queueCallback private property
AbstractDriver::$runCallback private property
AbstractDriver::$stopped private property
AbstractDriver::$suspensions private property @var \WeakMap&lt;object, \WeakReference&lt;DriverSuspension&gt;&gt;
AbstractDriver::activate abstract protected function Activates (enables) all the given callbacks. 4
AbstractDriver::cancel public function Cancel a callback. Overrides Driver::cancel 3
AbstractDriver::createCallbackFiber private function
AbstractDriver::createErrorCallback private function
AbstractDriver::createLoopFiber private function
AbstractDriver::deactivate abstract protected function Deactivates (disables) the given callback. 4
AbstractDriver::defer public function Defer the execution of a callback. Overrides Driver::defer
AbstractDriver::delay public function Delay the execution of a callback. Overrides Driver::delay
AbstractDriver::disable public function Disable a callback immediately. Overrides Driver::disable
AbstractDriver::dispatch abstract protected function Dispatches any pending read/write, timer, and signal events. 4
AbstractDriver::enable public function Enable a callback to be active starting in the next tick. Overrides Driver::enable
AbstractDriver::enqueueCallback final protected function
AbstractDriver::error final protected function Invokes the error handler with the given exception.
AbstractDriver::getErrorHandler public function Gets the error handler closure or {@code null} if none is set. Overrides Driver::getErrorHandler
AbstractDriver::getIdentifiers public function Returns all registered non-cancelled callback identifiers. Overrides Driver::getIdentifiers
AbstractDriver::getSuspension public function Returns an object used to suspend and resume execution of the current fiber or {main}. Overrides Driver::getSuspension
AbstractDriver::getType public function Returns the type of the callback identified by the given callback identifier. Overrides Driver::getType
AbstractDriver::invokeCallbacks private function
AbstractDriver::invokeInterrupt private function
AbstractDriver::invokeMicrotasks private function
AbstractDriver::isEmpty private function
AbstractDriver::isEnabled public function Returns whether the callback identified by the given callback identifier is currently enabled. Overrides Driver::isEnabled
AbstractDriver::isReferenced public function Returns whether the callback identified by the given callback identifier is currently referenced. Overrides Driver::isReferenced
AbstractDriver::isRunning public function Overrides Driver::isRunning
AbstractDriver::now abstract protected function Returns the current event loop time in second increments. 4
AbstractDriver::onReadable public function Execute a callback when a stream resource becomes readable or is closed for reading. Overrides Driver::onReadable
AbstractDriver::onSignal public function Execute a callback when a signal is received. Overrides Driver::onSignal 1
AbstractDriver::onWritable public function Execute a callback when a stream resource becomes writable or is closed for writing. Overrides Driver::onWritable
AbstractDriver::queue public function Queue a microtask. Overrides Driver::queue
AbstractDriver::reference public function Reference a callback. Overrides Driver::reference
AbstractDriver::repeat public function Repeatedly execute a callback. Overrides Driver::repeat
AbstractDriver::run public function Run the event loop. Overrides Driver::run 2
AbstractDriver::setErrorHandler public function Set a callback to be executed when an error occurs. Overrides Driver::setErrorHandler
AbstractDriver::setInterrupt private function
AbstractDriver::stop public function Stop the event loop. Overrides Driver::stop 2
AbstractDriver::tick private function Executes a single tick of the event loop.
AbstractDriver::unreference public function Unreference a callback. Overrides Driver::unreference
AbstractDriver::__construct public function 4
AbstractDriver::__debugInfo public function Returns some useful information about the event loop. Overrides Driver::__debugInfo
Driver::getHandle public function Get the underlying loop handle. 5
RSS feed
Powered by Drupal