Skip to main content
Drupal API
User account menu
  • Log in

Breadcrumb

  1. Drupal Core 11.1.x
  2. DoctrineDbalPostgreSqlStore.php

class DoctrineDbalPostgreSqlStore

DoctrineDbalPostgreSqlStore is a PersistingStoreInterface implementation using PostgreSql advisory locks with a Doctrine DBAL Connection.

@author Jérémy Derussé <jeremy@derusse.com>

Hierarchy

  • class \Symfony\Component\Lock\Store\DoctrineDbalPostgreSqlStore implements \Symfony\Component\Lock\BlockingSharedLockStoreInterface, \Symfony\Component\Lock\BlockingStoreInterface

Expanded class hierarchy of DoctrineDbalPostgreSqlStore

File

vendor/symfony/lock/Store/DoctrineDbalPostgreSqlStore.php, line 33

Namespace

Symfony\Component\Lock\Store
View source
class DoctrineDbalPostgreSqlStore implements BlockingSharedLockStoreInterface, BlockingStoreInterface {
    private Connection $conn;
    private static array $storeRegistry = [];
    
    /**
     * You can either pass an existing database connection a Doctrine DBAL Connection
     * or a URL that will be used to connect to the database.
     *
     * @throws InvalidArgumentException When first argument is not Connection nor string
     */
    public function __construct(Connection|string $connOrUrl) {
        if ($connOrUrl instanceof Connection) {
            if (!$connOrUrl->getDatabasePlatform() instanceof PostgreSQLPlatform) {
                throw new InvalidArgumentException(\sprintf('The adapter "%s" does not support the "%s" platform.', __CLASS__, $connOrUrl->getDatabasePlatform()::class));
            }
            $this->conn = $connOrUrl;
        }
        else {
            if (!class_exists(DriverManager::class)) {
                throw new InvalidArgumentException('Failed to parse DSN. Try running "composer require doctrine/dbal".');
            }
            $params = (new DsnParser([
                'db2' => 'ibm_db2',
                'mssql' => 'pdo_sqlsrv',
                'mysql' => 'pdo_mysql',
                'mysql2' => 'pdo_mysql',
                'postgres' => 'pdo_pgsql',
                'postgresql' => 'pdo_pgsql',
                'pgsql' => 'pdo_pgsql',
                'sqlite' => 'pdo_sqlite',
                'sqlite3' => 'pdo_sqlite',
            ]))->parse($this->filterDsn($connOrUrl));
            $config = new Configuration();
            $config->setSchemaManagerFactory(new DefaultSchemaManagerFactory());
            $this->conn = DriverManager::getConnection($params, $config);
        }
    }
    public function save(Key $key) : void {
        // prevent concurrency within the same connection
        $this->getInternalStore()
            ->save($key);
        $lockAcquired = false;
        try {
            $sql = 'SELECT pg_try_advisory_lock(:key)';
            $result = $this->conn
                ->executeQuery($sql, [
                'key' => $this->getHashedKey($key),
            ]);
            // Check if lock is acquired
            if (true === $result->fetchOne()) {
                $key->markUnserializable();
                // release sharedLock in case of promotion
                $this->unlockShared($key);
                $lockAcquired = true;
                return;
            }
        } finally {
            if (!$lockAcquired) {
                $this->getInternalStore()
                    ->delete($key);
            }
        }
        throw new LockConflictedException();
    }
    public function saveRead(Key $key) : void {
        // prevent concurrency within the same connection
        $this->getInternalStore()
            ->saveRead($key);
        $lockAcquired = false;
        try {
            $sql = 'SELECT pg_try_advisory_lock_shared(:key)';
            $result = $this->conn
                ->executeQuery($sql, [
                'key' => $this->getHashedKey($key),
            ]);
            // Check if lock is acquired
            if (true === $result->fetchOne()) {
                $key->markUnserializable();
                // release lock in case of demotion
                $this->unlock($key);
                $lockAcquired = true;
                return;
            }
        } finally {
            if (!$lockAcquired) {
                $this->getInternalStore()
                    ->delete($key);
            }
        }
        throw new LockConflictedException();
    }
    public function putOffExpiration(Key $key, float $ttl) : void {
        // postgresql locks forever.
        // check if lock still exists
        if (!$this->exists($key)) {
            throw new LockConflictedException();
        }
    }
    public function delete(Key $key) : void {
        // Prevent deleting locks own by an other key in the same connection
        if (!$this->exists($key)) {
            return;
        }
        $this->unlock($key);
        // Prevent deleting Readlocks own by current key AND an other key in the same connection
        $store = $this->getInternalStore();
        try {
            // If lock acquired = there is no other ReadLock
            $store->save($key);
            $this->unlockShared($key);
        } catch (LockConflictedException) {
            // an other key exists in this ReadLock
        }
        $store->delete($key);
    }
    public function exists(Key $key) : bool {
        $sql = "SELECT count(*) FROM pg_locks WHERE locktype='advisory' AND objid=:key AND pid=pg_backend_pid()";
        $result = $this->conn
            ->executeQuery($sql, [
            'key' => $this->getHashedKey($key),
        ]);
        if ($result->fetchOne() > 0) {
            // connection is locked, check for lock in internal store
            return $this->getInternalStore()
                ->exists($key);
        }
        return false;
    }
    public function waitAndSave(Key $key) : void {
        // prevent concurrency within the same connection
        // Internal store does not allow blocking mode, because there is no way to acquire one in a single process
        $this->getInternalStore()
            ->save($key);
        $lockAcquired = false;
        $sql = 'SELECT pg_advisory_lock(:key)';
        try {
            $this->conn
                ->executeStatement($sql, [
                'key' => $this->getHashedKey($key),
            ]);
            $lockAcquired = true;
        } finally {
            if (!$lockAcquired) {
                $this->getInternalStore()
                    ->delete($key);
            }
        }
        // release lock in case of promotion
        $this->unlockShared($key);
    }
    public function waitAndSaveRead(Key $key) : void {
        // prevent concurrency within the same connection
        // Internal store does not allow blocking mode, because there is no way to acquire one in a single process
        $this->getInternalStore()
            ->saveRead($key);
        $lockAcquired = false;
        $sql = 'SELECT pg_advisory_lock_shared(:key)';
        try {
            $this->conn
                ->executeStatement($sql, [
                'key' => $this->getHashedKey($key),
            ]);
            $lockAcquired = true;
        } finally {
            if (!$lockAcquired) {
                $this->getInternalStore()
                    ->delete($key);
            }
        }
        // release lock in case of demotion
        $this->unlock($key);
    }
    
    /**
     * Returns a hashed version of the key.
     */
    private function getHashedKey(Key $key) : int {
        return crc32((string) $key);
    }
    private function unlock(Key $key) : void {
        do {
            $sql = "SELECT pg_advisory_unlock(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ExclusiveLock' AND objid=:key AND pid=pg_backend_pid()";
            $result = $this->conn
                ->executeQuery($sql, [
                'key' => $this->getHashedKey($key),
            ]);
        } while (0 !== $result->rowCount());
    }
    private function unlockShared(Key $key) : void {
        do {
            $sql = "SELECT pg_advisory_unlock_shared(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ShareLock' AND objid=:key AND pid=pg_backend_pid()";
            $result = $this->conn
                ->executeQuery($sql, [
                'key' => $this->getHashedKey($key),
            ]);
        } while (0 !== $result->rowCount());
    }
    
    /**
     * Check driver and remove scheme extension from DSN.
     * From pgsql+advisory://server/ to pgsql://server/.
     *
     * @throws InvalidArgumentException when driver is not supported
     */
    private function filterDsn(string $dsn) : string {
        if (!str_contains($dsn, '://')) {
            throw new InvalidArgumentException('DSN is invalid for Doctrine DBAL.');
        }
        [
            $scheme,
            $rest,
        ] = explode(':', $dsn, 2);
        $driver = substr($scheme, 0, strpos($scheme, '+') ?: null);
        if (!\in_array($driver, [
            'pgsql',
            'postgres',
            'postgresql',
        ])) {
            throw new InvalidArgumentException(\sprintf('The adapter "%s" does not support the "%s" driver.', __CLASS__, $driver));
        }
        return \sprintf('%s:%s', $driver, $rest);
    }
    private function getInternalStore() : SharedLockStoreInterface {
        $namespace = spl_object_hash($this->conn);
        return self::$storeRegistry[$namespace] ??= new InMemoryStore();
    }

}

Members

Title Sort descending Modifiers Object type Summary Overriden Title
DoctrineDbalPostgreSqlStore::$conn private property
DoctrineDbalPostgreSqlStore::$storeRegistry private static property
DoctrineDbalPostgreSqlStore::delete public function Removes a resource from the storage. Overrides PersistingStoreInterface::delete
DoctrineDbalPostgreSqlStore::exists public function Returns whether or not the resource exists in the storage. Overrides PersistingStoreInterface::exists
DoctrineDbalPostgreSqlStore::filterDsn private function Check driver and remove scheme extension from DSN.
From pgsql+advisory://server/ to pgsql://server/.
DoctrineDbalPostgreSqlStore::getHashedKey private function Returns a hashed version of the key.
DoctrineDbalPostgreSqlStore::getInternalStore private function
DoctrineDbalPostgreSqlStore::putOffExpiration public function Extends the TTL of a resource. Overrides PersistingStoreInterface::putOffExpiration
DoctrineDbalPostgreSqlStore::save public function Stores the resource if it&#039;s not locked by someone else. Overrides PersistingStoreInterface::save
DoctrineDbalPostgreSqlStore::saveRead public function Stores the resource if it&#039;s not locked for reading by someone else. Overrides SharedLockStoreInterface::saveRead
DoctrineDbalPostgreSqlStore::unlock private function
DoctrineDbalPostgreSqlStore::unlockShared private function
DoctrineDbalPostgreSqlStore::waitAndSave public function Waits until a key becomes free, then stores the resource. Overrides BlockingStoreInterface::waitAndSave
DoctrineDbalPostgreSqlStore::waitAndSaveRead public function Waits until a key becomes free for reading, then stores the resource. Overrides BlockingSharedLockStoreInterface::waitAndSaveRead
DoctrineDbalPostgreSqlStore::__construct public function You can either pass an existing database connection a Doctrine DBAL Connection
or a URL that will be used to connect to the database.

API Navigation

  • Drupal Core 11.1.x
  • Topics
  • Classes
  • Functions
  • Constants
  • Globals
  • Files
  • Namespaces
  • Deprecated
  • Services
RSS feed
Powered by Drupal