Skip to main content
Drupal API
User account menu
  • Log in

Breadcrumb

  1. Drupal Core 11.1.x

AbstractDriver.php

Namespace

Revolt\EventLoop\Internal

File

vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php

View source
<?php

declare (strict_types=1);
namespace Revolt\EventLoop\Internal;

use Revolt\EventLoop\CallbackType;
use Revolt\EventLoop\Driver;
use Revolt\EventLoop\FiberLocal;
use Revolt\EventLoop\InvalidCallbackError;
use Revolt\EventLoop\Suspension;
use Revolt\EventLoop\UncaughtThrowable;

/**
 * Event loop driver which implements all basic operations to allow interoperability.
 *
 * Callbacks (enabled or new callbacks) MUST immediately be marked as enabled, but only be activated (i.e. callbacks can
 * be called) right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
 *
 * All registered callbacks MUST NOT be called from a file with strict types enabled (`declare(strict_types=1)`).
 *
 * @internal
 */
abstract class AbstractDriver implements Driver {
    
    /** @var string Next callback identifier. */
    private string $nextId = "a";
    private \Fiber $fiber;
    private \Fiber $callbackFiber;
    private \Closure $errorCallback;
    
    /** @var array<string, DriverCallback> */
    private array $callbacks = [];
    
    /** @var array<string, DriverCallback> */
    private array $enableQueue = [];
    
    /** @var array<string, DriverCallback> */
    private array $enableDeferQueue = [];
    
    /** @var null|\Closure(\Throwable):void */
    private ?\Closure $errorHandler = null;
    
    /** @var null|\Closure():mixed */
    private ?\Closure $interrupt = null;
    private readonly \Closure $interruptCallback;
    private readonly \Closure $queueCallback;
    private readonly \Closure $runCallback;
    private readonly \stdClass $internalSuspensionMarker;
    
    /** @var \SplQueue<array{\Closure, array}> */
    private readonly \SplQueue $microtaskQueue;
    
    /** @var \SplQueue<DriverCallback> */
    private readonly \SplQueue $callbackQueue;
    private bool $idle = false;
    private bool $stopped = false;
    
    /** @var \WeakMap<object, \WeakReference<DriverSuspension>> */
    private \WeakMap $suspensions;
    public function __construct() {
        if (\PHP_VERSION_ID < 80117 || \PHP_VERSION_ID >= 80200 && \PHP_VERSION_ID < 80204) {
            // PHP GC is broken on early 8.1 and 8.2 versions, see https://github.com/php/php-src/issues/10496
            if (!\getenv('REVOLT_DRIVER_SUPPRESS_ISSUE_10496')) {
                throw new \Error('Your version of PHP is affected by serious garbage collector bugs related to fibers. Please upgrade to a newer version of PHP, i.e. >= 8.1.17 or => 8.2.4');
            }
        }
        $this->suspensions = new \WeakMap();
        $this->internalSuspensionMarker = new \stdClass();
        $this->microtaskQueue = new \SplQueue();
        $this->callbackQueue = new \SplQueue();
        $this->createLoopFiber();
        $this->createCallbackFiber();
        $this->createErrorCallback();
        
        /** @psalm-suppress InvalidArgument */
        $this->interruptCallback = $this->setInterrupt(...);
        $this->queueCallback = $this->queue(...);
        $this->runCallback = function () {
            if ($this->fiber
                ->isTerminated()) {
                $this->createLoopFiber();
            }
            return $this->fiber
                ->isStarted() ? $this->fiber
                ->resume() : $this->fiber
                ->start();
        };
    }
    public function run() : void {
        if ($this->fiber
            ->isRunning()) {
            throw new \Error("The event loop is already running");
        }
        if (\Fiber::getCurrent()) {
            throw new \Error(\sprintf("Can't call %s() within a fiber (i.e., outside of {main})", __METHOD__));
        }
        if ($this->fiber
            ->isTerminated()) {
            $this->createLoopFiber();
        }
        
        /** @noinspection PhpUnhandledExceptionInspection */
        $lambda = $this->fiber
            ->isStarted() ? $this->fiber
            ->resume() : $this->fiber
            ->start();
        if ($lambda) {
            $lambda();
            throw new \Error('Interrupt from event loop must throw an exception: ' . ClosureHelper::getDescription($lambda));
        }
    }
    public function stop() : void {
        $this->stopped = true;
    }
    public function isRunning() : bool {
        return $this->fiber
            ->isRunning() || $this->fiber
            ->isSuspended();
    }
    public function queue(\Closure $closure, mixed ...$args) : void {
        $this->microtaskQueue
            ->enqueue([
            $closure,
            $args,
        ]);
    }
    public function defer(\Closure $closure) : string {
        $deferCallback = new DeferCallback($this->nextId++, $closure);
        $this->callbacks[$deferCallback->id] = $deferCallback;
        $this->enableDeferQueue[$deferCallback->id] = $deferCallback;
        return $deferCallback->id;
    }
    public function delay(float $delay, \Closure $closure) : string {
        if ($delay < 0) {
            throw new \Error("Delay must be greater than or equal to zero");
        }
        $timerCallback = new TimerCallback($this->nextId++, $delay, $closure, $this->now() + $delay);
        $this->callbacks[$timerCallback->id] = $timerCallback;
        $this->enableQueue[$timerCallback->id] = $timerCallback;
        return $timerCallback->id;
    }
    public function repeat(float $interval, \Closure $closure) : string {
        if ($interval < 0) {
            throw new \Error("Interval must be greater than or equal to zero");
        }
        $timerCallback = new TimerCallback($this->nextId++, $interval, $closure, $this->now() + $interval, true);
        $this->callbacks[$timerCallback->id] = $timerCallback;
        $this->enableQueue[$timerCallback->id] = $timerCallback;
        return $timerCallback->id;
    }
    public function onReadable(mixed $stream, \Closure $closure) : string {
        $streamCallback = new StreamReadableCallback($this->nextId++, $closure, $stream);
        $this->callbacks[$streamCallback->id] = $streamCallback;
        $this->enableQueue[$streamCallback->id] = $streamCallback;
        return $streamCallback->id;
    }
    public function onWritable($stream, \Closure $closure) : string {
        $streamCallback = new StreamWritableCallback($this->nextId++, $closure, $stream);
        $this->callbacks[$streamCallback->id] = $streamCallback;
        $this->enableQueue[$streamCallback->id] = $streamCallback;
        return $streamCallback->id;
    }
    public function onSignal(int $signal, \Closure $closure) : string {
        $signalCallback = new SignalCallback($this->nextId++, $closure, $signal);
        $this->callbacks[$signalCallback->id] = $signalCallback;
        $this->enableQueue[$signalCallback->id] = $signalCallback;
        return $signalCallback->id;
    }
    public function enable(string $callbackId) : string {
        if (!isset($this->callbacks[$callbackId])) {
            throw InvalidCallbackError::invalidIdentifier($callbackId);
        }
        $callback = $this->callbacks[$callbackId];
        if ($callback->enabled) {
            return $callbackId;
            // Callback already enabled.
        }
        $callback->enabled = true;
        if ($callback instanceof DeferCallback) {
            $this->enableDeferQueue[$callback->id] = $callback;
        }
        elseif ($callback instanceof TimerCallback) {
            $callback->expiration = $this->now() + $callback->interval;
            $this->enableQueue[$callback->id] = $callback;
        }
        else {
            $this->enableQueue[$callback->id] = $callback;
        }
        return $callbackId;
    }
    public function cancel(string $callbackId) : void {
        $this->disable($callbackId);
        unset($this->callbacks[$callbackId]);
    }
    public function disable(string $callbackId) : string {
        if (!isset($this->callbacks[$callbackId])) {
            return $callbackId;
        }
        $callback = $this->callbacks[$callbackId];
        if (!$callback->enabled) {
            return $callbackId;
            // Callback already disabled.
        }
        $callback->enabled = false;
        $callback->invokable = false;
        $id = $callback->id;
        if ($callback instanceof DeferCallback) {
            // Callback was only queued to be enabled.
            unset($this->enableDeferQueue[$id]);
        }
        elseif (isset($this->enableQueue[$id])) {
            // Callback was only queued to be enabled.
            unset($this->enableQueue[$id]);
        }
        else {
            $this->deactivate($callback);
        }
        return $callbackId;
    }
    public function reference(string $callbackId) : string {
        if (!isset($this->callbacks[$callbackId])) {
            throw InvalidCallbackError::invalidIdentifier($callbackId);
        }
        $this->callbacks[$callbackId]->referenced = true;
        return $callbackId;
    }
    public function unreference(string $callbackId) : string {
        if (!isset($this->callbacks[$callbackId])) {
            return $callbackId;
        }
        $this->callbacks[$callbackId]->referenced = false;
        return $callbackId;
    }
    public function getSuspension() : Suspension {
        $fiber = \Fiber::getCurrent();
        // User callbacks are always executed outside the event loop fiber, so this should always be false.
        \assert($fiber !== $this->fiber);
        // Use queue closure in case of {main}, which can be unset by DriverSuspension after an uncaught exception.
        $key = $fiber ?? $this->queueCallback;
        $suspension = ($this->suspensions[$key] ?? null)?->get();
        if ($suspension) {
            return $suspension;
        }
        $suspension = new DriverSuspension($this->runCallback, $this->queueCallback, $this->interruptCallback, $this->suspensions);
        $this->suspensions[$key] = \WeakReference::create($suspension);
        return $suspension;
    }
    public function setErrorHandler(?\Closure $errorHandler) : void {
        $this->errorHandler = $errorHandler;
    }
    public function getErrorHandler() : ?\Closure {
        return $this->errorHandler;
    }
    public function __debugInfo() : array {
        // @codeCoverageIgnoreStart
        return \array_map(fn(DriverCallback $callback) => [
            'type' => $this->getType($callback->id),
            'enabled' => $callback->enabled,
            'referenced' => $callback->referenced,
        ], $this->callbacks);
        // @codeCoverageIgnoreEnd
    }
    public function getIdentifiers() : array {
        return \array_keys($this->callbacks);
    }
    public function getType(string $callbackId) : CallbackType {
        $callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
        return match ($callback::class) {    DeferCallback::class => CallbackType::Defer,
            TimerCallback::class => $callback->repeat ? CallbackType::Repeat : CallbackType::Delay,
            StreamReadableCallback::class => CallbackType::Readable,
            StreamWritableCallback::class => CallbackType::Writable,
            SignalCallback::class => CallbackType::Signal,
        
        };
    }
    public function isEnabled(string $callbackId) : bool {
        $callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
        return $callback->enabled;
    }
    public function isReferenced(string $callbackId) : bool {
        $callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
        return $callback->referenced;
    }
    
    /**
     * Activates (enables) all the given callbacks.
     */
    protected abstract function activate(array $callbacks) : void;
    
    /**
     * Dispatches any pending read/write, timer, and signal events.
     */
    protected abstract function dispatch(bool $blocking) : void;
    
    /**
     * Deactivates (disables) the given callback.
     */
    protected abstract function deactivate(DriverCallback $callback) : void;
    protected final function enqueueCallback(DriverCallback $callback) : void {
        $this->callbackQueue
            ->enqueue($callback);
        $this->idle = false;
    }
    
    /**
     * Invokes the error handler with the given exception.
     *
     * @param \Throwable $exception The exception thrown from an event callback.
     */
    protected final function error(\Closure $closure, \Throwable $exception) : void {
        if ($this->errorHandler === null) {
            // Explicitly override the previous interrupt if it exists in this case, hiding the exception is worse
            $this->interrupt = static fn() => $exception instanceof UncaughtThrowable ? throw $exception : throw UncaughtThrowable::throwingCallback($closure, $exception);
            return;
        }
        $fiber = new \Fiber($this->errorCallback);
        
        /** @noinspection PhpUnhandledExceptionInspection */
        $fiber->start($this->errorHandler, $exception);
    }
    
    /**
     * Returns the current event loop time in second increments.
     *
     * Note this value does not necessarily correlate to wall-clock time, rather the value returned is meant to be used
     * in relative comparisons to prior values returned by this method (intervals, expiration calculations, etc.).
     */
    protected abstract function now() : float;
    private function invokeMicrotasks() : void {
        while (!$this->microtaskQueue
            ->isEmpty()) {
            [
                $callback,
                $args,
            ] = $this->microtaskQueue
                ->dequeue();
            try {
                // Clear $args to allow garbage collection
                $callback(...$args, ...$args = []);
            } catch (\Throwable $exception) {
                $this->error($callback, $exception);
            } finally {
                FiberLocal::clear();
            }
            unset($callback, $args);
            if ($this->interrupt) {
                
                /** @noinspection PhpUnhandledExceptionInspection */
                \Fiber::suspend($this->internalSuspensionMarker);
            }
        }
    }
    
    /**
     * @return bool True if no enabled and referenced callbacks remain in the loop.
     */
    private function isEmpty() : bool {
        foreach ($this->callbacks as $callback) {
            if ($callback->enabled && $callback->referenced) {
                return false;
            }
        }
        return true;
    }
    
    /**
     * Executes a single tick of the event loop.
     */
    private function tick(bool $previousIdle) : void {
        $this->activate($this->enableQueue);
        foreach ($this->enableQueue as $callback) {
            $callback->invokable = true;
        }
        $this->enableQueue = [];
        foreach ($this->enableDeferQueue as $callback) {
            $callback->invokable = true;
            $this->enqueueCallback($callback);
        }
        $this->enableDeferQueue = [];
        $blocking = $previousIdle && !$this->stopped && !$this->isEmpty();
        if ($blocking) {
            $this->invokeCallbacks();
            
            /** @psalm-suppress TypeDoesNotContainType */
            if (!empty($this->enableDeferQueue) || !empty($this->enableQueue)) {
                $blocking = false;
            }
        }
        
        /** @psalm-suppress RedundantCondition */
        $this->dispatch($blocking);
    }
    private function invokeCallbacks() : void {
        while (!$this->microtaskQueue
            ->isEmpty() || !$this->callbackQueue
            ->isEmpty()) {
            
            /** @noinspection PhpUnhandledExceptionInspection */
            $yielded = $this->callbackFiber
                ->isStarted() ? $this->callbackFiber
                ->resume() : $this->callbackFiber
                ->start();
            if ($yielded !== $this->internalSuspensionMarker) {
                $this->createCallbackFiber();
            }
            if ($this->interrupt) {
                $this->invokeInterrupt();
            }
        }
    }
    
    /**
     * @param \Closure():mixed $interrupt
     */
    private function setInterrupt(\Closure $interrupt) : void {
        \assert($this->interrupt === null);
        $this->interrupt = $interrupt;
    }
    private function invokeInterrupt() : void {
        \assert($this->interrupt !== null);
        $interrupt = $this->interrupt;
        $this->interrupt = null;
        
        /** @noinspection PhpUnhandledExceptionInspection */
        \Fiber::suspend($interrupt);
    }
    private function createLoopFiber() : void {
        $this->fiber = new \Fiber(function () : void {
            $this->stopped = false;
            // Invoke microtasks if we have some
            $this->invokeCallbacks();
            
            /** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */
            while (!$this->stopped) {
                if ($this->interrupt) {
                    $this->invokeInterrupt();
                }
                if ($this->isEmpty()) {
                    return;
                }
                $previousIdle = $this->idle;
                $this->idle = true;
                $this->tick($previousIdle);
                $this->invokeCallbacks();
            }
        });
    }
    private function createCallbackFiber() : void {
        $this->callbackFiber = new \Fiber(function () : void {
            do {
                $this->invokeMicrotasks();
                while (!$this->callbackQueue
                    ->isEmpty()) {
                    
                    /** @var DriverCallback $callback */
                    $callback = $this->callbackQueue
                        ->dequeue();
                    if (!isset($this->callbacks[$callback->id]) || !$callback->invokable) {
                        unset($callback);
                        continue;
                    }
                    if ($callback instanceof DeferCallback) {
                        $this->cancel($callback->id);
                    }
                    elseif ($callback instanceof TimerCallback) {
                        if (!$callback->repeat) {
                            $this->cancel($callback->id);
                        }
                        else {
                            // Disable and re-enable, so it's not executed repeatedly in the same tick
                            // See https://github.com/amphp/amp/issues/131
                            $this->disable($callback->id);
                            $this->enable($callback->id);
                        }
                    }
                    try {
                        $result = match (true) {    $callback instanceof StreamCallback => ($callback->closure)($callback->id, $callback->stream),
                            $callback instanceof SignalCallback => ($callback->closure)($callback->id, $callback->signal),
                            default => ($callback->closure)($callback->id),
                        
                        };
                        if ($result !== null) {
                            throw InvalidCallbackError::nonNullReturn($callback->id, $callback->closure);
                        }
                    } catch (\Throwable $exception) {
                        $this->error($callback->closure, $exception);
                    } finally {
                        FiberLocal::clear();
                    }
                    unset($callback);
                    if ($this->interrupt) {
                        
                        /** @noinspection PhpUnhandledExceptionInspection */
                        \Fiber::suspend($this->internalSuspensionMarker);
                    }
                    $this->invokeMicrotasks();
                }
                
                /** @noinspection PhpUnhandledExceptionInspection */
                \Fiber::suspend($this->internalSuspensionMarker);
            } while (true);
        });
    }
    private function createErrorCallback() : void {
        $this->errorCallback = function (\Closure $errorHandler, \Throwable $exception) : void {
            try {
                $errorHandler($exception);
            } catch (\Throwable $exception) {
                $this->interrupt = static fn() => $exception instanceof UncaughtThrowable ? throw $exception : throw UncaughtThrowable::throwingErrorHandler($errorHandler, $exception);
            }
        };
    }

}

Classes

Title Deprecated Summary
AbstractDriver Event loop driver which implements all basic operations to allow interoperability.
RSS feed
Powered by Drupal