class AbstractDriver
Event loop driver which implements all basic operations to allow interoperability.
Callbacks (enabled or new callbacks) MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
All registered callbacks MUST NOT be called from a file with strict types enabled (`declare(strict_types=1)`).
@internal
Hierarchy
- class \Revolt\EventLoop\Internal\AbstractDriver implements \Revolt\EventLoop\Driver
Expanded class hierarchy of AbstractDriver
5 files declare their use of AbstractDriver
- EvDriver.php in vendor/
revolt/ event-loop/ src/ EventLoop/ Driver/ EvDriver.php - EventDriver.php in vendor/
revolt/ event-loop/ src/ EventLoop/ Driver/ EventDriver.php - EventLoop.php in vendor/
revolt/ event-loop/ src/ EventLoop.php - StreamSelectDriver.php in vendor/
revolt/ event-loop/ src/ EventLoop/ Driver/ StreamSelectDriver.php - UvDriver.php in vendor/
revolt/ event-loop/ src/ EventLoop/ Driver/ UvDriver.php
File
-
vendor/
revolt/ event-loop/ src/ EventLoop/ Internal/ AbstractDriver.php, line 24
Namespace
Revolt\EventLoop\InternalView source
abstract class AbstractDriver implements Driver {
/** @var string Next callback identifier. */
private string $nextId = "a";
private \Fiber $fiber;
private \Fiber $callbackFiber;
private \Closure $errorCallback;
/** @var array<string, DriverCallback> */
private array $callbacks = [];
/** @var array<string, DriverCallback> */
private array $enableQueue = [];
/** @var array<string, DriverCallback> */
private array $enableDeferQueue = [];
/** @var null|\Closure(\Throwable):void */
private ?\Closure $errorHandler = null;
/** @var null|\Closure():mixed */
private ?\Closure $interrupt = null;
private readonly \Closure $interruptCallback;
private readonly \Closure $queueCallback;
private readonly \Closure $runCallback;
private readonly \stdClass $internalSuspensionMarker;
/** @var \SplQueue<array{\Closure, array}> */
private readonly \SplQueue $microtaskQueue;
/** @var \SplQueue<DriverCallback> */
private readonly \SplQueue $callbackQueue;
private bool $idle = false;
private bool $stopped = false;
/** @var \WeakMap<object, \WeakReference<DriverSuspension>> */
private \WeakMap $suspensions;
public function __construct() {
if (\PHP_VERSION_ID < 80117 || \PHP_VERSION_ID >= 80200 && \PHP_VERSION_ID < 80204) {
// PHP GC is broken on early 8.1 and 8.2 versions, see https://github.com/php/php-src/issues/10496
if (!\getenv('REVOLT_DRIVER_SUPPRESS_ISSUE_10496')) {
throw new \Error('Your version of PHP is affected by serious garbage collector bugs related to fibers. Please upgrade to a newer version of PHP, i.e. >= 8.1.17 or => 8.2.4');
}
}
$this->suspensions = new \WeakMap();
$this->internalSuspensionMarker = new \stdClass();
$this->microtaskQueue = new \SplQueue();
$this->callbackQueue = new \SplQueue();
$this->createLoopFiber();
$this->createCallbackFiber();
$this->createErrorCallback();
/** @psalm-suppress InvalidArgument */
$this->interruptCallback = $this->setInterrupt(...);
$this->queueCallback = $this->queue(...);
$this->runCallback = function () {
if ($this->fiber
->isTerminated()) {
$this->createLoopFiber();
}
return $this->fiber
->isStarted() ? $this->fiber
->resume() : $this->fiber
->start();
};
}
public function run() : void {
if ($this->fiber
->isRunning()) {
throw new \Error("The event loop is already running");
}
if (\Fiber::getCurrent()) {
throw new \Error(\sprintf("Can't call %s() within a fiber (i.e., outside of {main})", __METHOD__));
}
if ($this->fiber
->isTerminated()) {
$this->createLoopFiber();
}
/** @noinspection PhpUnhandledExceptionInspection */
$lambda = $this->fiber
->isStarted() ? $this->fiber
->resume() : $this->fiber
->start();
if ($lambda) {
$lambda();
throw new \Error('Interrupt from event loop must throw an exception: ' . ClosureHelper::getDescription($lambda));
}
}
public function stop() : void {
$this->stopped = true;
}
public function isRunning() : bool {
return $this->fiber
->isRunning() || $this->fiber
->isSuspended();
}
public function queue(\Closure $closure, mixed ...$args) : void {
$this->microtaskQueue
->enqueue([
$closure,
$args,
]);
}
public function defer(\Closure $closure) : string {
$deferCallback = new DeferCallback($this->nextId++, $closure);
$this->callbacks[$deferCallback->id] = $deferCallback;
$this->enableDeferQueue[$deferCallback->id] = $deferCallback;
return $deferCallback->id;
}
public function delay(float $delay, \Closure $closure) : string {
if ($delay < 0) {
throw new \Error("Delay must be greater than or equal to zero");
}
$timerCallback = new TimerCallback($this->nextId++, $delay, $closure, $this->now() + $delay);
$this->callbacks[$timerCallback->id] = $timerCallback;
$this->enableQueue[$timerCallback->id] = $timerCallback;
return $timerCallback->id;
}
public function repeat(float $interval, \Closure $closure) : string {
if ($interval < 0) {
throw new \Error("Interval must be greater than or equal to zero");
}
$timerCallback = new TimerCallback($this->nextId++, $interval, $closure, $this->now() + $interval, true);
$this->callbacks[$timerCallback->id] = $timerCallback;
$this->enableQueue[$timerCallback->id] = $timerCallback;
return $timerCallback->id;
}
public function onReadable(mixed $stream, \Closure $closure) : string {
$streamCallback = new StreamReadableCallback($this->nextId++, $closure, $stream);
$this->callbacks[$streamCallback->id] = $streamCallback;
$this->enableQueue[$streamCallback->id] = $streamCallback;
return $streamCallback->id;
}
public function onWritable($stream, \Closure $closure) : string {
$streamCallback = new StreamWritableCallback($this->nextId++, $closure, $stream);
$this->callbacks[$streamCallback->id] = $streamCallback;
$this->enableQueue[$streamCallback->id] = $streamCallback;
return $streamCallback->id;
}
public function onSignal(int $signal, \Closure $closure) : string {
$signalCallback = new SignalCallback($this->nextId++, $closure, $signal);
$this->callbacks[$signalCallback->id] = $signalCallback;
$this->enableQueue[$signalCallback->id] = $signalCallback;
return $signalCallback->id;
}
public function enable(string $callbackId) : string {
if (!isset($this->callbacks[$callbackId])) {
throw InvalidCallbackError::invalidIdentifier($callbackId);
}
$callback = $this->callbacks[$callbackId];
if ($callback->enabled) {
return $callbackId;
// Callback already enabled.
}
$callback->enabled = true;
if ($callback instanceof DeferCallback) {
$this->enableDeferQueue[$callback->id] = $callback;
}
elseif ($callback instanceof TimerCallback) {
$callback->expiration = $this->now() + $callback->interval;
$this->enableQueue[$callback->id] = $callback;
}
else {
$this->enableQueue[$callback->id] = $callback;
}
return $callbackId;
}
public function cancel(string $callbackId) : void {
$this->disable($callbackId);
unset($this->callbacks[$callbackId]);
}
public function disable(string $callbackId) : string {
if (!isset($this->callbacks[$callbackId])) {
return $callbackId;
}
$callback = $this->callbacks[$callbackId];
if (!$callback->enabled) {
return $callbackId;
// Callback already disabled.
}
$callback->enabled = false;
$callback->invokable = false;
$id = $callback->id;
if ($callback instanceof DeferCallback) {
// Callback was only queued to be enabled.
unset($this->enableDeferQueue[$id]);
}
elseif (isset($this->enableQueue[$id])) {
// Callback was only queued to be enabled.
unset($this->enableQueue[$id]);
}
else {
$this->deactivate($callback);
}
return $callbackId;
}
public function reference(string $callbackId) : string {
if (!isset($this->callbacks[$callbackId])) {
throw InvalidCallbackError::invalidIdentifier($callbackId);
}
$this->callbacks[$callbackId]->referenced = true;
return $callbackId;
}
public function unreference(string $callbackId) : string {
if (!isset($this->callbacks[$callbackId])) {
return $callbackId;
}
$this->callbacks[$callbackId]->referenced = false;
return $callbackId;
}
public function getSuspension() : Suspension {
$fiber = \Fiber::getCurrent();
// User callbacks are always executed outside the event loop fiber, so this should always be false.
\assert($fiber !== $this->fiber);
// Use queue closure in case of {main}, which can be unset by DriverSuspension after an uncaught exception.
$key = $fiber ?? $this->queueCallback;
$suspension = ($this->suspensions[$key] ?? null)?->get();
if ($suspension) {
return $suspension;
}
$suspension = new DriverSuspension($this->runCallback, $this->queueCallback, $this->interruptCallback, $this->suspensions);
$this->suspensions[$key] = \WeakReference::create($suspension);
return $suspension;
}
public function setErrorHandler(?\Closure $errorHandler) : void {
$this->errorHandler = $errorHandler;
}
public function getErrorHandler() : ?\Closure {
return $this->errorHandler;
}
public function __debugInfo() : array {
// @codeCoverageIgnoreStart
return \array_map(fn(DriverCallback $callback) => [
'type' => $this->getType($callback->id),
'enabled' => $callback->enabled,
'referenced' => $callback->referenced,
], $this->callbacks);
// @codeCoverageIgnoreEnd
}
public function getIdentifiers() : array {
return \array_keys($this->callbacks);
}
public function getType(string $callbackId) : CallbackType {
$callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
return match ($callback::class) { DeferCallback::class => CallbackType::Defer,
TimerCallback::class => $callback->repeat ? CallbackType::Repeat : CallbackType::Delay,
StreamReadableCallback::class => CallbackType::Readable,
StreamWritableCallback::class => CallbackType::Writable,
SignalCallback::class => CallbackType::Signal,
};
}
public function isEnabled(string $callbackId) : bool {
$callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
return $callback->enabled;
}
public function isReferenced(string $callbackId) : bool {
$callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
return $callback->referenced;
}
/**
* Activates (enables) all the given callbacks.
*/
protected abstract function activate(array $callbacks) : void;
/**
* Dispatches any pending read/write, timer, and signal events.
*/
protected abstract function dispatch(bool $blocking) : void;
/**
* Deactivates (disables) the given callback.
*/
protected abstract function deactivate(DriverCallback $callback) : void;
protected final function enqueueCallback(DriverCallback $callback) : void {
$this->callbackQueue
->enqueue($callback);
$this->idle = false;
}
/**
* Invokes the error handler with the given exception.
*
* @param \Throwable $exception The exception thrown from an event callback.
*/
protected final function error(\Closure $closure, \Throwable $exception) : void {
if ($this->errorHandler === null) {
// Explicitly override the previous interrupt if it exists in this case, hiding the exception is worse
$this->interrupt = static fn() => $exception instanceof UncaughtThrowable ? throw $exception : throw UncaughtThrowable::throwingCallback($closure, $exception);
return;
}
$fiber = new \Fiber($this->errorCallback);
/** @noinspection PhpUnhandledExceptionInspection */
$fiber->start($this->errorHandler, $exception);
}
/**
* Returns the current event loop time in second increments.
*
* Note this value does not necessarily correlate to wall-clock time, rather the value returned is meant to be used
* in relative comparisons to prior values returned by this method (intervals, expiration calculations, etc.).
*/
protected abstract function now() : float;
private function invokeMicrotasks() : void {
while (!$this->microtaskQueue
->isEmpty()) {
[
$callback,
$args,
] = $this->microtaskQueue
->dequeue();
try {
// Clear $args to allow garbage collection
$callback(...$args, ...$args = []);
} catch (\Throwable $exception) {
$this->error($callback, $exception);
} finally {
FiberLocal::clear();
}
unset($callback, $args);
if ($this->interrupt) {
/** @noinspection PhpUnhandledExceptionInspection */
\Fiber::suspend($this->internalSuspensionMarker);
}
}
}
/**
* @return bool True if no enabled and referenced callbacks remain in the loop.
*/
private function isEmpty() : bool {
foreach ($this->callbacks as $callback) {
if ($callback->enabled && $callback->referenced) {
return false;
}
}
return true;
}
/**
* Executes a single tick of the event loop.
*/
private function tick(bool $previousIdle) : void {
$this->activate($this->enableQueue);
foreach ($this->enableQueue as $callback) {
$callback->invokable = true;
}
$this->enableQueue = [];
foreach ($this->enableDeferQueue as $callback) {
$callback->invokable = true;
$this->enqueueCallback($callback);
}
$this->enableDeferQueue = [];
$blocking = $previousIdle && !$this->stopped && !$this->isEmpty();
if ($blocking) {
$this->invokeCallbacks();
/** @psalm-suppress TypeDoesNotContainType */
if (!empty($this->enableDeferQueue) || !empty($this->enableQueue)) {
$blocking = false;
}
}
/** @psalm-suppress RedundantCondition */
$this->dispatch($blocking);
}
private function invokeCallbacks() : void {
while (!$this->microtaskQueue
->isEmpty() || !$this->callbackQueue
->isEmpty()) {
/** @noinspection PhpUnhandledExceptionInspection */
$yielded = $this->callbackFiber
->isStarted() ? $this->callbackFiber
->resume() : $this->callbackFiber
->start();
if ($yielded !== $this->internalSuspensionMarker) {
$this->createCallbackFiber();
}
if ($this->interrupt) {
$this->invokeInterrupt();
}
}
}
/**
* @param \Closure():mixed $interrupt
*/
private function setInterrupt(\Closure $interrupt) : void {
\assert($this->interrupt === null);
$this->interrupt = $interrupt;
}
private function invokeInterrupt() : void {
\assert($this->interrupt !== null);
$interrupt = $this->interrupt;
$this->interrupt = null;
/** @noinspection PhpUnhandledExceptionInspection */
\Fiber::suspend($interrupt);
}
private function createLoopFiber() : void {
$this->fiber = new \Fiber(function () : void {
$this->stopped = false;
// Invoke microtasks if we have some
$this->invokeCallbacks();
/** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */
while (!$this->stopped) {
if ($this->interrupt) {
$this->invokeInterrupt();
}
if ($this->isEmpty()) {
return;
}
$previousIdle = $this->idle;
$this->idle = true;
$this->tick($previousIdle);
$this->invokeCallbacks();
}
});
}
private function createCallbackFiber() : void {
$this->callbackFiber = new \Fiber(function () : void {
do {
$this->invokeMicrotasks();
while (!$this->callbackQueue
->isEmpty()) {
/** @var DriverCallback $callback */
$callback = $this->callbackQueue
->dequeue();
if (!isset($this->callbacks[$callback->id]) || !$callback->invokable) {
unset($callback);
continue;
}
if ($callback instanceof DeferCallback) {
$this->cancel($callback->id);
}
elseif ($callback instanceof TimerCallback) {
if (!$callback->repeat) {
$this->cancel($callback->id);
}
else {
// Disable and re-enable, so it's not executed repeatedly in the same tick
// See https://github.com/amphp/amp/issues/131
$this->disable($callback->id);
$this->enable($callback->id);
}
}
try {
$result = match (true) { $callback instanceof StreamCallback => ($callback->closure)($callback->id, $callback->stream),
$callback instanceof SignalCallback => ($callback->closure)($callback->id, $callback->signal),
default => ($callback->closure)($callback->id),
};
if ($result !== null) {
throw InvalidCallbackError::nonNullReturn($callback->id, $callback->closure);
}
} catch (\Throwable $exception) {
$this->error($callback->closure, $exception);
} finally {
FiberLocal::clear();
}
unset($callback);
if ($this->interrupt) {
/** @noinspection PhpUnhandledExceptionInspection */
\Fiber::suspend($this->internalSuspensionMarker);
}
$this->invokeMicrotasks();
}
/** @noinspection PhpUnhandledExceptionInspection */
\Fiber::suspend($this->internalSuspensionMarker);
} while (true);
});
}
private function createErrorCallback() : void {
$this->errorCallback = function (\Closure $errorHandler, \Throwable $exception) : void {
try {
$errorHandler($exception);
} catch (\Throwable $exception) {
$this->interrupt = static fn() => $exception instanceof UncaughtThrowable ? throw $exception : throw UncaughtThrowable::throwingErrorHandler($errorHandler, $exception);
}
};
}
}
Members
Title Sort descending | Modifiers | Object type | Summary | Overriden Title | Overrides |
---|---|---|---|---|---|
AbstractDriver::$callbackFiber | private | property | |||
AbstractDriver::$callbackQueue | private | property | @var \SplQueue<DriverCallback> | ||
AbstractDriver::$callbacks | private | property | @var array<string, DriverCallback> | ||
AbstractDriver::$enableDeferQueue | private | property | @var array<string, DriverCallback> | ||
AbstractDriver::$enableQueue | private | property | @var array<string, DriverCallback> | ||
AbstractDriver::$errorCallback | private | property | |||
AbstractDriver::$errorHandler | private | property | @var null|\Closure(\Throwable):void | ||
AbstractDriver::$fiber | private | property | |||
AbstractDriver::$idle | private | property | |||
AbstractDriver::$internalSuspensionMarker | private | property | |||
AbstractDriver::$interrupt | private | property | @var null|\Closure():mixed | ||
AbstractDriver::$interruptCallback | private | property | |||
AbstractDriver::$microtaskQueue | private | property | @var \SplQueue<array{\Closure, array}> | ||
AbstractDriver::$nextId | private | property | @var string Next callback identifier. | ||
AbstractDriver::$queueCallback | private | property | |||
AbstractDriver::$runCallback | private | property | |||
AbstractDriver::$stopped | private | property | |||
AbstractDriver::$suspensions | private | property | @var \WeakMap<object, \WeakReference<DriverSuspension>> | ||
AbstractDriver::activate | abstract protected | function | Activates (enables) all the given callbacks. | 4 | |
AbstractDriver::cancel | public | function | Cancel a callback. | Overrides Driver::cancel | 3 |
AbstractDriver::createCallbackFiber | private | function | |||
AbstractDriver::createErrorCallback | private | function | |||
AbstractDriver::createLoopFiber | private | function | |||
AbstractDriver::deactivate | abstract protected | function | Deactivates (disables) the given callback. | 4 | |
AbstractDriver::defer | public | function | Defer the execution of a callback. | Overrides Driver::defer | |
AbstractDriver::delay | public | function | Delay the execution of a callback. | Overrides Driver::delay | |
AbstractDriver::disable | public | function | Disable a callback immediately. | Overrides Driver::disable | |
AbstractDriver::dispatch | abstract protected | function | Dispatches any pending read/write, timer, and signal events. | 4 | |
AbstractDriver::enable | public | function | Enable a callback to be active starting in the next tick. | Overrides Driver::enable | |
AbstractDriver::enqueueCallback | final protected | function | |||
AbstractDriver::error | final protected | function | Invokes the error handler with the given exception. | ||
AbstractDriver::getErrorHandler | public | function | Gets the error handler closure or {@code null} if none is set. | Overrides Driver::getErrorHandler | |
AbstractDriver::getIdentifiers | public | function | Returns all registered non-cancelled callback identifiers. | Overrides Driver::getIdentifiers | |
AbstractDriver::getSuspension | public | function | Returns an object used to suspend and resume execution of the current fiber or {main}. | Overrides Driver::getSuspension | |
AbstractDriver::getType | public | function | Returns the type of the callback identified by the given callback identifier. | Overrides Driver::getType | |
AbstractDriver::invokeCallbacks | private | function | |||
AbstractDriver::invokeInterrupt | private | function | |||
AbstractDriver::invokeMicrotasks | private | function | |||
AbstractDriver::isEmpty | private | function | |||
AbstractDriver::isEnabled | public | function | Returns whether the callback identified by the given callback identifier is currently enabled. | Overrides Driver::isEnabled | |
AbstractDriver::isReferenced | public | function | Returns whether the callback identified by the given callback identifier is currently referenced. | Overrides Driver::isReferenced | |
AbstractDriver::isRunning | public | function | Overrides Driver::isRunning | ||
AbstractDriver::now | abstract protected | function | Returns the current event loop time in second increments. | 4 | |
AbstractDriver::onReadable | public | function | Execute a callback when a stream resource becomes readable or is closed for reading. | Overrides Driver::onReadable | |
AbstractDriver::onSignal | public | function | Execute a callback when a signal is received. | Overrides Driver::onSignal | 1 |
AbstractDriver::onWritable | public | function | Execute a callback when a stream resource becomes writable or is closed for writing. | Overrides Driver::onWritable | |
AbstractDriver::queue | public | function | Queue a microtask. | Overrides Driver::queue | |
AbstractDriver::reference | public | function | Reference a callback. | Overrides Driver::reference | |
AbstractDriver::repeat | public | function | Repeatedly execute a callback. | Overrides Driver::repeat | |
AbstractDriver::run | public | function | Run the event loop. | Overrides Driver::run | 2 |
AbstractDriver::setErrorHandler | public | function | Set a callback to be executed when an error occurs. | Overrides Driver::setErrorHandler | |
AbstractDriver::setInterrupt | private | function | |||
AbstractDriver::stop | public | function | Stop the event loop. | Overrides Driver::stop | 2 |
AbstractDriver::tick | private | function | Executes a single tick of the event loop. | ||
AbstractDriver::unreference | public | function | Unreference a callback. | Overrides Driver::unreference | |
AbstractDriver::__construct | public | function | 4 | ||
AbstractDriver::__debugInfo | public | function | Returns some useful information about the event loop. | Overrides Driver::__debugInfo | |
Driver::getHandle | public | function | Get the underlying loop handle. | 5 |