Skip to main content
Drupal API
User account menu
  • Log in

Breadcrumb

  1. Drupal Core 11.1.x
  2. StreamSelectDriver.php

class StreamSelectDriver

Hierarchy

  • class \Revolt\EventLoop\Internal\AbstractDriver implements \Revolt\EventLoop\Driver
    • class \Revolt\EventLoop\Driver\StreamSelectDriver extends \Revolt\EventLoop\Internal\AbstractDriver

Expanded class hierarchy of StreamSelectDriver

1 file declares its use of StreamSelectDriver
DriverFactory.php in vendor/revolt/event-loop/src/EventLoop/DriverFactory.php

File

vendor/revolt/event-loop/src/EventLoop/Driver/StreamSelectDriver.php, line 18

Namespace

Revolt\EventLoop\Driver
View source
final class StreamSelectDriver extends AbstractDriver {
    
    /** @var array<int, resource> */
    private array $readStreams = [];
    
    /** @var array<int, array<string, StreamReadableCallback>> */
    private array $readCallbacks = [];
    
    /** @var array<int, resource> */
    private array $writeStreams = [];
    
    /** @var array<int, array<string, StreamWritableCallback>> */
    private array $writeCallbacks = [];
    private readonly TimerQueue $timerQueue;
    
    /** @var array<int, array<string, SignalCallback>> */
    private array $signalCallbacks = [];
    
    /** @var \SplQueue<int> */
    private readonly \SplQueue $signalQueue;
    private bool $signalHandling;
    private readonly \Closure $streamSelectErrorHandler;
    private bool $streamSelectIgnoreResult = false;
    public function __construct() {
        parent::__construct();
        $this->signalQueue = new \SplQueue();
        $this->timerQueue = new TimerQueue();
        $this->signalHandling = \extension_loaded("pcntl") && \function_exists('pcntl_signal_dispatch') && \function_exists('pcntl_signal');
        $this->streamSelectErrorHandler = function (int $errno, string $message) : void {
            // Casing changed in PHP 8 from 'unable' to 'Unable'
            if (\stripos($message, "stream_select(): unable to select [4]: ") === 0) {
                // EINTR
                $this->streamSelectIgnoreResult = true;
                return;
            }
            if (\str_contains($message, 'FD_SETSIZE')) {
                $message = \str_replace([
                    "\r\n",
                    "\n",
                    "\r",
                ], " ", $message);
                $pattern = '(stream_select\\(\\): You MUST recompile PHP with a larger value of FD_SETSIZE. It is set to (\\d+), but you have descriptors numbered at least as high as (\\d+)\\.)';
                if (\preg_match($pattern, $message, $match)) {
                    $helpLink = 'https://revolt.run/extensions';
                    $message = 'You have reached the limits of stream_select(). It has a FD_SETSIZE of ' . $match[1] . ', but you have file descriptors numbered at least as high as ' . $match[2] . '. ' . "You can install one of the extensions listed on {$helpLink} to support a higher number of " . "concurrent file descriptors. If a large number of open file descriptors is unexpected, you " . "might be leaking file descriptors that aren't closed correctly.";
                }
            }
            throw new \Exception($message, $errno);
        };
    }
    public function __destruct() {
        foreach ($this->signalCallbacks as $signalCallbacks) {
            foreach ($signalCallbacks as $signalCallback) {
                $this->deactivate($signalCallback);
            }
        }
    }
    
    /**
     * @throws UnsupportedFeatureException If the pcntl extension is not available.
     */
    public function onSignal(int $signal, \Closure $closure) : string {
        if (!$this->signalHandling) {
            throw new UnsupportedFeatureException("Signal handling requires the pcntl extension");
        }
        return parent::onSignal($signal, $closure);
    }
    public function getHandle() : mixed {
        return null;
    }
    protected function now() : float {
        return (double) \hrtime(true) / 1000000000;
    }
    
    /**
     * @throws \Throwable
     */
    protected function dispatch(bool $blocking) : void {
        if ($this->signalHandling) {
            \pcntl_signal_dispatch();
            while (!$this->signalQueue
                ->isEmpty()) {
                $signal = $this->signalQueue
                    ->dequeue();
                foreach ($this->signalCallbacks[$signal] as $callback) {
                    $this->enqueueCallback($callback);
                }
                $blocking = false;
            }
        }
        $this->selectStreams($this->readStreams, $this->writeStreams, $blocking ? $this->getTimeout() : 0.0);
        $now = $this->now();
        while ($callback = $this->timerQueue
            ->extract($now)) {
            $this->enqueueCallback($callback);
        }
    }
    protected function activate(array $callbacks) : void {
        foreach ($callbacks as $callback) {
            if ($callback instanceof StreamReadableCallback) {
                \assert(\is_resource($callback->stream));
                $streamId = (int) $callback->stream;
                $this->readCallbacks[$streamId][$callback->id] = $callback;
                $this->readStreams[$streamId] = $callback->stream;
            }
            elseif ($callback instanceof StreamWritableCallback) {
                \assert(\is_resource($callback->stream));
                $streamId = (int) $callback->stream;
                $this->writeCallbacks[$streamId][$callback->id] = $callback;
                $this->writeStreams[$streamId] = $callback->stream;
            }
            elseif ($callback instanceof TimerCallback) {
                $this->timerQueue
                    ->insert($callback);
            }
            elseif ($callback instanceof SignalCallback) {
                if (!isset($this->signalCallbacks[$callback->signal])) {
                    \set_error_handler(static function (int $errno, string $errstr) : bool {
                        throw new UnsupportedFeatureException(\sprintf("Failed to register signal handler; Errno: %d; %s", $errno, $errstr));
                    });
                    // Avoid bug in Psalm handling of first-class callables by assigning to a temp variable.
                    $handler = $this->handleSignal(...);
                    try {
                        \pcntl_signal($callback->signal, $handler);
                    } finally {
                        \restore_error_handler();
                    }
                }
                $this->signalCallbacks[$callback->signal][$callback->id] = $callback;
            }
            else {
                // @codeCoverageIgnoreStart
                throw new \Error("Unknown callback type");
                // @codeCoverageIgnoreEnd
            }
        }
    }
    protected function deactivate(DriverCallback $callback) : void {
        if ($callback instanceof StreamReadableCallback) {
            $streamId = (int) $callback->stream;
            unset($this->readCallbacks[$streamId][$callback->id]);
            if (empty($this->readCallbacks[$streamId])) {
                unset($this->readCallbacks[$streamId], $this->readStreams[$streamId]);
            }
        }
        elseif ($callback instanceof StreamWritableCallback) {
            $streamId = (int) $callback->stream;
            unset($this->writeCallbacks[$streamId][$callback->id]);
            if (empty($this->writeCallbacks[$streamId])) {
                unset($this->writeCallbacks[$streamId], $this->writeStreams[$streamId]);
            }
        }
        elseif ($callback instanceof TimerCallback) {
            $this->timerQueue
                ->remove($callback);
        }
        elseif ($callback instanceof SignalCallback) {
            if (isset($this->signalCallbacks[$callback->signal])) {
                unset($this->signalCallbacks[$callback->signal][$callback->id]);
                if (empty($this->signalCallbacks[$callback->signal])) {
                    unset($this->signalCallbacks[$callback->signal]);
                    \set_error_handler(static fn() => true);
                    try {
                        \pcntl_signal($callback->signal, \SIG_DFL);
                    } finally {
                        \restore_error_handler();
                    }
                }
            }
        }
        else {
            // @codeCoverageIgnoreStart
            throw new \Error("Unknown callback type");
            // @codeCoverageIgnoreEnd
        }
    }
    
    /**
     * @param array<int, resource> $read
     * @param array<int, resource> $write
     */
    private function selectStreams(array $read, array $write, float $timeout) : void {
        if (!empty($read) || !empty($write)) {
            // Use stream_select() if there are any streams in the loop.
            if ($timeout >= 0) {
                $seconds = (int) $timeout;
                $microseconds = (int) (($timeout - $seconds) * 1000000);
            }
            else {
                $seconds = null;
                $microseconds = null;
            }
            // Failed connection attempts are indicated via except on Windows
            // @link https://github.com/reactphp/event-loop/blob/8bd064ce23c26c4decf186c2a5a818c9a8209eb0/src/StreamSelectLoop.php#L279-L287
            // @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select
            $except = null;
            if (\DIRECTORY_SEPARATOR === '\\') {
                $except = $write;
            }
            \set_error_handler($this->streamSelectErrorHandler);
            try {
                
                /** @psalm-suppress InvalidArgument */
                $result = \stream_select($read, $write, $except, $seconds, $microseconds);
            } finally {
                \restore_error_handler();
            }
            if ($this->streamSelectIgnoreResult || $result === 0) {
                $this->streamSelectIgnoreResult = false;
                return;
            }
            if (!$result) {
                throw new \Exception('Unknown error during stream_select');
            }
            foreach ($read as $stream) {
                $streamId = (int) $stream;
                if (!isset($this->readCallbacks[$streamId])) {
                    continue;
                    // All read callbacks disabled.
                }
                foreach ($this->readCallbacks[$streamId] as $callback) {
                    $this->enqueueCallback($callback);
                }
            }
            
            /** @var array<int, resource>|null $except */
            if ($except) {
                foreach ($except as $key => $socket) {
                    $write[$key] = $socket;
                }
            }
            foreach ($write as $stream) {
                $streamId = (int) $stream;
                if (!isset($this->writeCallbacks[$streamId])) {
                    continue;
                    // All write callbacks disabled.
                }
                foreach ($this->writeCallbacks[$streamId] as $callback) {
                    $this->enqueueCallback($callback);
                }
            }
            return;
        }
        if ($timeout < 0) {
            // Only signal callbacks are enabled, so sleep indefinitely.
            
            /** @psalm-suppress ArgumentTypeCoercion */
            \usleep(\PHP_INT_MAX);
            return;
        }
        if ($timeout > 0) {
            // Sleep until next timer expires.
            
            /** @psalm-suppress ArgumentTypeCoercion $timeout is positive here. */
            \usleep((int) ($timeout * 1000000));
        }
    }
    
    /**
     * @return float Seconds until next timer expires or -1 if there are no pending timers.
     */
    private function getTimeout() : float {
        $expiration = $this->timerQueue
            ->peek();
        if ($expiration === null) {
            return -1;
        }
        $expiration -= $this->now();
        return $expiration > 0 ? $expiration : 0.0;
    }
    private function handleSignal(int $signal) : void {
        // Queue signals, so we don't suspend inside pcntl_signal_dispatch, which disables signals while it runs
        $this->signalQueue
            ->enqueue($signal);
    }

}

Members

Title Sort descending Modifiers Object type Summary Overriden Title Overrides
AbstractDriver::$callbackFiber private property
AbstractDriver::$callbackQueue private property @var \SplQueue&lt;DriverCallback&gt;
AbstractDriver::$callbacks private property @var array&lt;string, DriverCallback&gt;
AbstractDriver::$enableDeferQueue private property @var array&lt;string, DriverCallback&gt;
AbstractDriver::$enableQueue private property @var array&lt;string, DriverCallback&gt;
AbstractDriver::$errorCallback private property
AbstractDriver::$errorHandler private property @var null|\Closure(\Throwable):void
AbstractDriver::$fiber private property
AbstractDriver::$idle private property
AbstractDriver::$internalSuspensionMarker private property
AbstractDriver::$interrupt private property @var null|\Closure():mixed
AbstractDriver::$interruptCallback private property
AbstractDriver::$microtaskQueue private property @var \SplQueue&lt;array{\Closure, array}&gt;
AbstractDriver::$nextId private property @var string Next callback identifier.
AbstractDriver::$queueCallback private property
AbstractDriver::$runCallback private property
AbstractDriver::$stopped private property
AbstractDriver::$suspensions private property @var \WeakMap&lt;object, \WeakReference&lt;DriverSuspension&gt;&gt;
AbstractDriver::cancel public function Cancel a callback. Overrides Driver::cancel 3
AbstractDriver::createCallbackFiber private function
AbstractDriver::createErrorCallback private function
AbstractDriver::createLoopFiber private function
AbstractDriver::defer public function Defer the execution of a callback. Overrides Driver::defer
AbstractDriver::delay public function Delay the execution of a callback. Overrides Driver::delay
AbstractDriver::disable public function Disable a callback immediately. Overrides Driver::disable
AbstractDriver::enable public function Enable a callback to be active starting in the next tick. Overrides Driver::enable
AbstractDriver::enqueueCallback final protected function
AbstractDriver::error final protected function Invokes the error handler with the given exception.
AbstractDriver::getErrorHandler public function Gets the error handler closure or {@code null} if none is set. Overrides Driver::getErrorHandler
AbstractDriver::getIdentifiers public function Returns all registered non-cancelled callback identifiers. Overrides Driver::getIdentifiers
AbstractDriver::getSuspension public function Returns an object used to suspend and resume execution of the current fiber or {main}. Overrides Driver::getSuspension
AbstractDriver::getType public function Returns the type of the callback identified by the given callback identifier. Overrides Driver::getType
AbstractDriver::invokeCallbacks private function
AbstractDriver::invokeInterrupt private function
AbstractDriver::invokeMicrotasks private function
AbstractDriver::isEmpty private function
AbstractDriver::isEnabled public function Returns whether the callback identified by the given callback identifier is currently enabled. Overrides Driver::isEnabled
AbstractDriver::isReferenced public function Returns whether the callback identified by the given callback identifier is currently referenced. Overrides Driver::isReferenced
AbstractDriver::isRunning public function Overrides Driver::isRunning
AbstractDriver::onReadable public function Execute a callback when a stream resource becomes readable or is closed for reading. Overrides Driver::onReadable
AbstractDriver::onWritable public function Execute a callback when a stream resource becomes writable or is closed for writing. Overrides Driver::onWritable
AbstractDriver::queue public function Queue a microtask. Overrides Driver::queue
AbstractDriver::reference public function Reference a callback. Overrides Driver::reference
AbstractDriver::repeat public function Repeatedly execute a callback. Overrides Driver::repeat
AbstractDriver::run public function Run the event loop. Overrides Driver::run 2
AbstractDriver::setErrorHandler public function Set a callback to be executed when an error occurs. Overrides Driver::setErrorHandler
AbstractDriver::setInterrupt private function
AbstractDriver::stop public function Stop the event loop. Overrides Driver::stop 2
AbstractDriver::tick private function Executes a single tick of the event loop.
AbstractDriver::unreference public function Unreference a callback. Overrides Driver::unreference
AbstractDriver::__debugInfo public function Returns some useful information about the event loop. Overrides Driver::__debugInfo
StreamSelectDriver::$readCallbacks private property @var array&lt;int, array&lt;string, StreamReadableCallback&gt;&gt;
StreamSelectDriver::$readStreams private property @var array&lt;int, resource&gt;
StreamSelectDriver::$signalCallbacks private property @var array&lt;int, array&lt;string, SignalCallback&gt;&gt;
StreamSelectDriver::$signalHandling private property
StreamSelectDriver::$signalQueue private property @var \SplQueue&lt;int&gt;
StreamSelectDriver::$streamSelectErrorHandler private property
StreamSelectDriver::$streamSelectIgnoreResult private property
StreamSelectDriver::$timerQueue private property
StreamSelectDriver::$writeCallbacks private property @var array&lt;int, array&lt;string, StreamWritableCallback&gt;&gt;
StreamSelectDriver::$writeStreams private property @var array&lt;int, resource&gt;
StreamSelectDriver::activate protected function Activates (enables) all the given callbacks. Overrides AbstractDriver::activate
StreamSelectDriver::deactivate protected function Deactivates (disables) the given callback. Overrides AbstractDriver::deactivate
StreamSelectDriver::dispatch protected function Overrides AbstractDriver::dispatch
StreamSelectDriver::getHandle public function Get the underlying loop handle. Overrides Driver::getHandle
StreamSelectDriver::getTimeout private function
StreamSelectDriver::handleSignal private function
StreamSelectDriver::now protected function Returns the current event loop time in second increments. Overrides AbstractDriver::now
StreamSelectDriver::onSignal public function Overrides AbstractDriver::onSignal
StreamSelectDriver::selectStreams private function
StreamSelectDriver::__construct public function Overrides AbstractDriver::__construct
StreamSelectDriver::__destruct public function
RSS feed
Powered by Drupal