class StreamSelectDriver
Hierarchy
- class \Revolt\EventLoop\Internal\AbstractDriver implements \Revolt\EventLoop\Driver
- class \Revolt\EventLoop\Driver\StreamSelectDriver extends \Revolt\EventLoop\Internal\AbstractDriver
Expanded class hierarchy of StreamSelectDriver
1 file declares its use of StreamSelectDriver
- DriverFactory.php in vendor/
revolt/ event-loop/ src/ EventLoop/ DriverFactory.php
File
-
vendor/
revolt/ event-loop/ src/ EventLoop/ Driver/ StreamSelectDriver.php, line 18
Namespace
Revolt\EventLoop\DriverView source
final class StreamSelectDriver extends AbstractDriver {
/** @var array<int, resource> */
private array $readStreams = [];
/** @var array<int, array<string, StreamReadableCallback>> */
private array $readCallbacks = [];
/** @var array<int, resource> */
private array $writeStreams = [];
/** @var array<int, array<string, StreamWritableCallback>> */
private array $writeCallbacks = [];
private readonly TimerQueue $timerQueue;
/** @var array<int, array<string, SignalCallback>> */
private array $signalCallbacks = [];
/** @var \SplQueue<int> */
private readonly \SplQueue $signalQueue;
private bool $signalHandling;
private readonly \Closure $streamSelectErrorHandler;
private bool $streamSelectIgnoreResult = false;
public function __construct() {
parent::__construct();
$this->signalQueue = new \SplQueue();
$this->timerQueue = new TimerQueue();
$this->signalHandling = \extension_loaded("pcntl") && \function_exists('pcntl_signal_dispatch') && \function_exists('pcntl_signal');
$this->streamSelectErrorHandler = function (int $errno, string $message) : void {
// Casing changed in PHP 8 from 'unable' to 'Unable'
if (\stripos($message, "stream_select(): unable to select [4]: ") === 0) {
// EINTR
$this->streamSelectIgnoreResult = true;
return;
}
if (\str_contains($message, 'FD_SETSIZE')) {
$message = \str_replace([
"\r\n",
"\n",
"\r",
], " ", $message);
$pattern = '(stream_select\\(\\): You MUST recompile PHP with a larger value of FD_SETSIZE. It is set to (\\d+), but you have descriptors numbered at least as high as (\\d+)\\.)';
if (\preg_match($pattern, $message, $match)) {
$helpLink = 'https://revolt.run/extensions';
$message = 'You have reached the limits of stream_select(). It has a FD_SETSIZE of ' . $match[1] . ', but you have file descriptors numbered at least as high as ' . $match[2] . '. ' . "You can install one of the extensions listed on {$helpLink} to support a higher number of " . "concurrent file descriptors. If a large number of open file descriptors is unexpected, you " . "might be leaking file descriptors that aren't closed correctly.";
}
}
throw new \Exception($message, $errno);
};
}
public function __destruct() {
foreach ($this->signalCallbacks as $signalCallbacks) {
foreach ($signalCallbacks as $signalCallback) {
$this->deactivate($signalCallback);
}
}
}
/**
* @throws UnsupportedFeatureException If the pcntl extension is not available.
*/
public function onSignal(int $signal, \Closure $closure) : string {
if (!$this->signalHandling) {
throw new UnsupportedFeatureException("Signal handling requires the pcntl extension");
}
return parent::onSignal($signal, $closure);
}
public function getHandle() : mixed {
return null;
}
protected function now() : float {
return (double) \hrtime(true) / 1000000000;
}
/**
* @throws \Throwable
*/
protected function dispatch(bool $blocking) : void {
if ($this->signalHandling) {
\pcntl_signal_dispatch();
while (!$this->signalQueue
->isEmpty()) {
$signal = $this->signalQueue
->dequeue();
foreach ($this->signalCallbacks[$signal] as $callback) {
$this->enqueueCallback($callback);
}
$blocking = false;
}
}
$this->selectStreams($this->readStreams, $this->writeStreams, $blocking ? $this->getTimeout() : 0.0);
$now = $this->now();
while ($callback = $this->timerQueue
->extract($now)) {
$this->enqueueCallback($callback);
}
}
protected function activate(array $callbacks) : void {
foreach ($callbacks as $callback) {
if ($callback instanceof StreamReadableCallback) {
\assert(\is_resource($callback->stream));
$streamId = (int) $callback->stream;
$this->readCallbacks[$streamId][$callback->id] = $callback;
$this->readStreams[$streamId] = $callback->stream;
}
elseif ($callback instanceof StreamWritableCallback) {
\assert(\is_resource($callback->stream));
$streamId = (int) $callback->stream;
$this->writeCallbacks[$streamId][$callback->id] = $callback;
$this->writeStreams[$streamId] = $callback->stream;
}
elseif ($callback instanceof TimerCallback) {
$this->timerQueue
->insert($callback);
}
elseif ($callback instanceof SignalCallback) {
if (!isset($this->signalCallbacks[$callback->signal])) {
\set_error_handler(static function (int $errno, string $errstr) : bool {
throw new UnsupportedFeatureException(\sprintf("Failed to register signal handler; Errno: %d; %s", $errno, $errstr));
});
// Avoid bug in Psalm handling of first-class callables by assigning to a temp variable.
$handler = $this->handleSignal(...);
try {
\pcntl_signal($callback->signal, $handler);
} finally {
\restore_error_handler();
}
}
$this->signalCallbacks[$callback->signal][$callback->id] = $callback;
}
else {
// @codeCoverageIgnoreStart
throw new \Error("Unknown callback type");
// @codeCoverageIgnoreEnd
}
}
}
protected function deactivate(DriverCallback $callback) : void {
if ($callback instanceof StreamReadableCallback) {
$streamId = (int) $callback->stream;
unset($this->readCallbacks[$streamId][$callback->id]);
if (empty($this->readCallbacks[$streamId])) {
unset($this->readCallbacks[$streamId], $this->readStreams[$streamId]);
}
}
elseif ($callback instanceof StreamWritableCallback) {
$streamId = (int) $callback->stream;
unset($this->writeCallbacks[$streamId][$callback->id]);
if (empty($this->writeCallbacks[$streamId])) {
unset($this->writeCallbacks[$streamId], $this->writeStreams[$streamId]);
}
}
elseif ($callback instanceof TimerCallback) {
$this->timerQueue
->remove($callback);
}
elseif ($callback instanceof SignalCallback) {
if (isset($this->signalCallbacks[$callback->signal])) {
unset($this->signalCallbacks[$callback->signal][$callback->id]);
if (empty($this->signalCallbacks[$callback->signal])) {
unset($this->signalCallbacks[$callback->signal]);
\set_error_handler(static fn() => true);
try {
\pcntl_signal($callback->signal, \SIG_DFL);
} finally {
\restore_error_handler();
}
}
}
}
else {
// @codeCoverageIgnoreStart
throw new \Error("Unknown callback type");
// @codeCoverageIgnoreEnd
}
}
/**
* @param array<int, resource> $read
* @param array<int, resource> $write
*/
private function selectStreams(array $read, array $write, float $timeout) : void {
if (!empty($read) || !empty($write)) {
// Use stream_select() if there are any streams in the loop.
if ($timeout >= 0) {
$seconds = (int) $timeout;
$microseconds = (int) (($timeout - $seconds) * 1000000);
}
else {
$seconds = null;
$microseconds = null;
}
// Failed connection attempts are indicated via except on Windows
// @link https://github.com/reactphp/event-loop/blob/8bd064ce23c26c4decf186c2a5a818c9a8209eb0/src/StreamSelectLoop.php#L279-L287
// @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select
$except = null;
if (\DIRECTORY_SEPARATOR === '\\') {
$except = $write;
}
\set_error_handler($this->streamSelectErrorHandler);
try {
/** @psalm-suppress InvalidArgument */
$result = \stream_select($read, $write, $except, $seconds, $microseconds);
} finally {
\restore_error_handler();
}
if ($this->streamSelectIgnoreResult || $result === 0) {
$this->streamSelectIgnoreResult = false;
return;
}
if (!$result) {
throw new \Exception('Unknown error during stream_select');
}
foreach ($read as $stream) {
$streamId = (int) $stream;
if (!isset($this->readCallbacks[$streamId])) {
continue;
// All read callbacks disabled.
}
foreach ($this->readCallbacks[$streamId] as $callback) {
$this->enqueueCallback($callback);
}
}
/** @var array<int, resource>|null $except */
if ($except) {
foreach ($except as $key => $socket) {
$write[$key] = $socket;
}
}
foreach ($write as $stream) {
$streamId = (int) $stream;
if (!isset($this->writeCallbacks[$streamId])) {
continue;
// All write callbacks disabled.
}
foreach ($this->writeCallbacks[$streamId] as $callback) {
$this->enqueueCallback($callback);
}
}
return;
}
if ($timeout < 0) {
// Only signal callbacks are enabled, so sleep indefinitely.
/** @psalm-suppress ArgumentTypeCoercion */
\usleep(\PHP_INT_MAX);
return;
}
if ($timeout > 0) {
// Sleep until next timer expires.
/** @psalm-suppress ArgumentTypeCoercion $timeout is positive here. */
\usleep((int) ($timeout * 1000000));
}
}
/**
* @return float Seconds until next timer expires or -1 if there are no pending timers.
*/
private function getTimeout() : float {
$expiration = $this->timerQueue
->peek();
if ($expiration === null) {
return -1;
}
$expiration -= $this->now();
return $expiration > 0 ? $expiration : 0.0;
}
private function handleSignal(int $signal) : void {
// Queue signals, so we don't suspend inside pcntl_signal_dispatch, which disables signals while it runs
$this->signalQueue
->enqueue($signal);
}
}
Members
Title Sort descending | Modifiers | Object type | Summary | Overriden Title | Overrides |
---|---|---|---|---|---|
AbstractDriver::$callbackFiber | private | property | |||
AbstractDriver::$callbackQueue | private | property | @var \SplQueue<DriverCallback> | ||
AbstractDriver::$callbacks | private | property | @var array<string, DriverCallback> | ||
AbstractDriver::$enableDeferQueue | private | property | @var array<string, DriverCallback> | ||
AbstractDriver::$enableQueue | private | property | @var array<string, DriverCallback> | ||
AbstractDriver::$errorCallback | private | property | |||
AbstractDriver::$errorHandler | private | property | @var null|\Closure(\Throwable):void | ||
AbstractDriver::$fiber | private | property | |||
AbstractDriver::$idle | private | property | |||
AbstractDriver::$internalSuspensionMarker | private | property | |||
AbstractDriver::$interrupt | private | property | @var null|\Closure():mixed | ||
AbstractDriver::$interruptCallback | private | property | |||
AbstractDriver::$microtaskQueue | private | property | @var \SplQueue<array{\Closure, array}> | ||
AbstractDriver::$nextId | private | property | @var string Next callback identifier. | ||
AbstractDriver::$queueCallback | private | property | |||
AbstractDriver::$runCallback | private | property | |||
AbstractDriver::$stopped | private | property | |||
AbstractDriver::$suspensions | private | property | @var \WeakMap<object, \WeakReference<DriverSuspension>> | ||
AbstractDriver::cancel | public | function | Cancel a callback. | Overrides Driver::cancel | 3 |
AbstractDriver::createCallbackFiber | private | function | |||
AbstractDriver::createErrorCallback | private | function | |||
AbstractDriver::createLoopFiber | private | function | |||
AbstractDriver::defer | public | function | Defer the execution of a callback. | Overrides Driver::defer | |
AbstractDriver::delay | public | function | Delay the execution of a callback. | Overrides Driver::delay | |
AbstractDriver::disable | public | function | Disable a callback immediately. | Overrides Driver::disable | |
AbstractDriver::enable | public | function | Enable a callback to be active starting in the next tick. | Overrides Driver::enable | |
AbstractDriver::enqueueCallback | final protected | function | |||
AbstractDriver::error | final protected | function | Invokes the error handler with the given exception. | ||
AbstractDriver::getErrorHandler | public | function | Gets the error handler closure or {@code null} if none is set. | Overrides Driver::getErrorHandler | |
AbstractDriver::getIdentifiers | public | function | Returns all registered non-cancelled callback identifiers. | Overrides Driver::getIdentifiers | |
AbstractDriver::getSuspension | public | function | Returns an object used to suspend and resume execution of the current fiber or {main}. | Overrides Driver::getSuspension | |
AbstractDriver::getType | public | function | Returns the type of the callback identified by the given callback identifier. | Overrides Driver::getType | |
AbstractDriver::invokeCallbacks | private | function | |||
AbstractDriver::invokeInterrupt | private | function | |||
AbstractDriver::invokeMicrotasks | private | function | |||
AbstractDriver::isEmpty | private | function | |||
AbstractDriver::isEnabled | public | function | Returns whether the callback identified by the given callback identifier is currently enabled. | Overrides Driver::isEnabled | |
AbstractDriver::isReferenced | public | function | Returns whether the callback identified by the given callback identifier is currently referenced. | Overrides Driver::isReferenced | |
AbstractDriver::isRunning | public | function | Overrides Driver::isRunning | ||
AbstractDriver::onReadable | public | function | Execute a callback when a stream resource becomes readable or is closed for reading. | Overrides Driver::onReadable | |
AbstractDriver::onWritable | public | function | Execute a callback when a stream resource becomes writable or is closed for writing. | Overrides Driver::onWritable | |
AbstractDriver::queue | public | function | Queue a microtask. | Overrides Driver::queue | |
AbstractDriver::reference | public | function | Reference a callback. | Overrides Driver::reference | |
AbstractDriver::repeat | public | function | Repeatedly execute a callback. | Overrides Driver::repeat | |
AbstractDriver::run | public | function | Run the event loop. | Overrides Driver::run | 2 |
AbstractDriver::setErrorHandler | public | function | Set a callback to be executed when an error occurs. | Overrides Driver::setErrorHandler | |
AbstractDriver::setInterrupt | private | function | |||
AbstractDriver::stop | public | function | Stop the event loop. | Overrides Driver::stop | 2 |
AbstractDriver::tick | private | function | Executes a single tick of the event loop. | ||
AbstractDriver::unreference | public | function | Unreference a callback. | Overrides Driver::unreference | |
AbstractDriver::__debugInfo | public | function | Returns some useful information about the event loop. | Overrides Driver::__debugInfo | |
StreamSelectDriver::$readCallbacks | private | property | @var array<int, array<string, StreamReadableCallback>> | ||
StreamSelectDriver::$readStreams | private | property | @var array<int, resource> | ||
StreamSelectDriver::$signalCallbacks | private | property | @var array<int, array<string, SignalCallback>> | ||
StreamSelectDriver::$signalHandling | private | property | |||
StreamSelectDriver::$signalQueue | private | property | @var \SplQueue<int> | ||
StreamSelectDriver::$streamSelectErrorHandler | private | property | |||
StreamSelectDriver::$streamSelectIgnoreResult | private | property | |||
StreamSelectDriver::$timerQueue | private | property | |||
StreamSelectDriver::$writeCallbacks | private | property | @var array<int, array<string, StreamWritableCallback>> | ||
StreamSelectDriver::$writeStreams | private | property | @var array<int, resource> | ||
StreamSelectDriver::activate | protected | function | Activates (enables) all the given callbacks. | Overrides AbstractDriver::activate | |
StreamSelectDriver::deactivate | protected | function | Deactivates (disables) the given callback. | Overrides AbstractDriver::deactivate | |
StreamSelectDriver::dispatch | protected | function | Overrides AbstractDriver::dispatch | ||
StreamSelectDriver::getHandle | public | function | Get the underlying loop handle. | Overrides Driver::getHandle | |
StreamSelectDriver::getTimeout | private | function | |||
StreamSelectDriver::handleSignal | private | function | |||
StreamSelectDriver::now | protected | function | Returns the current event loop time in second increments. | Overrides AbstractDriver::now | |
StreamSelectDriver::onSignal | public | function | Overrides AbstractDriver::onSignal | ||
StreamSelectDriver::selectStreams | private | function | |||
StreamSelectDriver::__construct | public | function | Overrides AbstractDriver::__construct | ||
StreamSelectDriver::__destruct | public | function |