diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a779da5..f288451 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,7 +16,7 @@ jobs: - uses: shivammathur/setup-php@f3e473d116dcccaddc5834248c87452386958240 # ratchet:shivammathur/setup-php@v2 with: php-version: '8.4' - extensions: swoole + extensions: opentelemetry, protobuf, swoole tools: phpunit coverage: none diff --git a/composer.json b/composer.json index 093bfec..c2e257c 100755 --- a/composer.json +++ b/composer.json @@ -36,7 +36,7 @@ "require-dev": { "swoole/ide-helper": "6.*" }, - "suggests": { + "suggest": { "ext-mongodb": "Needed to support MongoDB database pools", "ext-redis": "Needed to support Redis cache pools", "ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools", @@ -50,8 +50,5 @@ "php-http/discovery": false, "tbachert/spi": false } - }, - "suggest": { - "ext-swoole": "Required to use the Swoole pool adapter" } } diff --git a/src/Pools/Connection.php b/src/Pools/Connection.php index 5e49b94..0cd9688 100644 --- a/src/Pools/Connection.php +++ b/src/Pools/Connection.php @@ -19,7 +19,9 @@ class Connection /** * @param TResource $resource */ - public function __construct(protected mixed $resource) {} + public function __construct(protected mixed $resource) + { + } /** * @return string diff --git a/src/Pools/Group.php b/src/Pools/Group.php index b8997d1..f25450b 100644 --- a/src/Pools/Group.php +++ b/src/Pools/Group.php @@ -68,28 +68,49 @@ public function use(array $names, callable $callback): mixed if (empty($names)) { throw new Exception('Cannot use with empty names'); } - return $this->useInternal($names, $callback); - } - /** - * Internal recursive callback for `use`. - * - * @template TReturn - * @param array $names Name of resources - * @param callable(mixed...): TReturn $callback Function that receives the connection resources - * @param array $resources - * @return TReturn - * @throws Exception - */ - private function useInternal(array $names, callable $callback, array $resources = []): mixed - { - if (empty($names)) { - return $callback(...$resources); + $connections = []; + $pools = []; + $starts = []; + $started = false; + $failed = false; + $thrown = null; + $result = null; + + try { + foreach ($names as $name) { + $pool = $this->get($name); + $starts[] = microtime(true); + $pools[] = $pool; + $connections[] = $pool->pop(); + } + + $started = true; + $result = $callback(...array_map(fn (Connection $connection) => $connection->getResource(), $connections)); + } catch (\Throwable $error) { + $thrown = $error; + $failed = $started; + } + + $releaseError = null; + + for ($i = \count($connections) - 1; $i >= 0; $i--) { + try { + $pools[$i]->release($connections[$i], $failed, $starts[$i]); + } catch (\Throwable $error) { + $releaseError ??= $error; + } + } + + if ($thrown !== null) { + throw $thrown; + } + + if ($releaseError !== null) { + throw $releaseError; } - return $this - ->get(array_shift($names)) - ->use(fn($resource) => $this->useInternal($names, $callback, array_merge($resources, [$resource]))); + return $result; } /** diff --git a/src/Pools/Pool.php b/src/Pools/Pool.php index 0fac17a..827acd0 100644 --- a/src/Pools/Pool.php +++ b/src/Pools/Pool.php @@ -224,15 +224,88 @@ public function use(callable $callback): mixed { $start = microtime(true); $connection = null; + $failed = false; + try { $connection = $this->pop(); return $callback($connection->getResource()); + } catch (\Throwable $error) { + $failed = true; + throw $error; } finally { $this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes); if ($connection !== null) { - $this->reclaim($connection); + $this->release($connection, $failed); + } + } + } + + /** + * @param Connection $connection + * @return $this + * @internal + */ + public function release(Connection $connection, bool $failed = false, ?float $start = null): static + { + if ($start !== null) { + $this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes); + } + + if (!$failed) { + return $this->reclaim($connection); + } + + if ($this->recover($connection)) { + try { + return $this->reclaim($connection); + } catch (\Throwable) { + try { + return $this->destroy($connection); + } catch (\Throwable) { + return $this; + } + } + } + + try { + return $this->destroy($connection); + } catch (\Throwable) { + return $this; + } + } + + /** + * @param Connection $connection + */ + private function recover(Connection $connection): bool + { + $resource = $connection->getResource(); + + if (!\is_object($resource)) { + return !\is_resource($resource); + } + + try { + $recovered = false; + + if (\method_exists($resource, 'reset')) { + $recovered = true; + if ($resource->reset() === false) { + return false; + } } + + if (\method_exists($resource, 'reconnect')) { + $recovered = true; + if ($resource->reconnect() === false) { + return false; + } + } + } catch (\Throwable) { + return false; } + + return $recovered; } /** @@ -418,7 +491,7 @@ private function destroyConnection(?Connection $connection = null): static if ($shouldCreate) { try { $this->pool->push($this->createConnection()); - } catch (Exception $e) { + } catch (\Throwable $e) { $this->pool->synchronized(function (): void { $this->connectionsCreated--; }); diff --git a/tests/Pools/Adapter/SwooleTest.php b/tests/Pools/Adapter/SwooleTest.php index c74df13..5a1af2d 100644 --- a/tests/Pools/Adapter/SwooleTest.php +++ b/tests/Pools/Adapter/SwooleTest.php @@ -343,7 +343,7 @@ public function testSwooleCoroutineStressTest(): void } public function testInitOutsideCoroutineNotThrowAnyError(): void { - $pool = new Pool(new Swoole(), 'test', 1, fn() => 'x'); + $pool = new Pool(new Swoole(), 'test', 1, fn () => 'x'); $this->assertInstanceOf(Pool::class, $pool); } } diff --git a/tests/Pools/Scopes/ConnectionTestScope.php b/tests/Pools/Scopes/ConnectionTestScope.php index 67632d4..ebf3c62 100644 --- a/tests/Pools/Scopes/ConnectionTestScope.php +++ b/tests/Pools/Scopes/ConnectionTestScope.php @@ -69,7 +69,7 @@ public function testConnectionSetPool(): void { $this->execute(function (): void { $this->setUpConnection(); - $pool = new Pool($this->getAdapter(), 'test', 1, fn() => 'x'); + $pool = new Pool($this->getAdapter(), 'test', 1, fn () => 'x'); $this->assertNull($this->connectionObject->getPool()); $this->assertInstanceOf(Connection::class, $this->connectionObject->setPool($pool)); @@ -80,7 +80,7 @@ public function testConnectionGetPool(): void { $this->execute(function (): void { $this->setUpConnection(); - $pool = new Pool($this->getAdapter(), 'test', 1, fn() => 'x'); + $pool = new Pool($this->getAdapter(), 'test', 1, fn () => 'x'); $this->assertNull($this->connectionObject->getPool()); $this->assertInstanceOf(Connection::class, $this->connectionObject->setPool($pool)); @@ -99,7 +99,7 @@ public function testConnectionGetPool(): void public function testConnectionReclaim(): void { $this->execute(function (): void { - $pool = new Pool($this->getAdapter(), 'test', 2, fn() => 'x'); + $pool = new Pool($this->getAdapter(), 'test', 2, fn () => 'x'); $this->assertSame(2, $pool->count()); diff --git a/tests/Pools/Scopes/GroupTestScope.php b/tests/Pools/Scopes/GroupTestScope.php index be4a5a1..65c5653 100644 --- a/tests/Pools/Scopes/GroupTestScope.php +++ b/tests/Pools/Scopes/GroupTestScope.php @@ -3,8 +3,10 @@ namespace Utopia\Tests\Scopes; use Exception; +use Utopia\Pools\Connection; use Utopia\Pools\Group; use Utopia\Pools\Pool; +use Utopia\Telemetry\Adapter\Test as TestTelemetry; trait GroupTestScope { @@ -22,7 +24,7 @@ public function testGroupAdd(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn () => 'x')); $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); }); @@ -32,7 +34,7 @@ public function testGroupGet(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn () => 'x')); $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); @@ -46,7 +48,7 @@ public function testGroupRemove(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn () => 'x')); $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); @@ -62,7 +64,7 @@ public function testGroupReset(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn () => 'x')); $this->assertSame(5, $this->groupObject->get('test')->count()); @@ -82,7 +84,7 @@ public function testGroupReconnectAttempts(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn () => 'x')); $this->assertSame(3, $this->groupObject->get('test')->getReconnectAttempts()); @@ -96,7 +98,7 @@ public function testGroupReconnectSleep(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn () => 'x')); $this->assertSame(1, $this->groupObject->get('test')->getReconnectSleep()); @@ -110,9 +112,9 @@ public function testGroupUse(): void { $this->execute(function (): void { $this->setUpGroup(); - $pool1 = new Pool($this->getAdapter(), 'pool1', 1, fn() => '1'); - $pool2 = new Pool($this->getAdapter(), 'pool2', 1, fn() => '2'); - $pool3 = new Pool($this->getAdapter(), 'pool3', 1, fn() => '3'); + $pool1 = new Pool($this->getAdapter(), 'pool1', 1, fn () => '1'); + $pool2 = new Pool($this->getAdapter(), 'pool2', 1, fn () => '2'); + $pool3 = new Pool($this->getAdapter(), 'pool3', 1, fn () => '3'); $this->groupObject->add($pool1); $this->groupObject->add($pool2); @@ -137,4 +139,112 @@ public function testGroupUse(): void $this->assertSame(1, $pool3->count()); }); } + + public function testGroupUseReclaimsEarlierConnectionWhenLaterPoolIsMissing(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $created = 0; + $resources = []; + $pool = new Pool($this->getAdapter(), 'pool1', 1, function () use (&$created, &$resources) { + $created++; + $resources[] = new class ('resource-' . $created) implements \Stringable { + public function __construct(private string $name) + { + } + + public function __toString(): string + { + return $this->name; + } + }; + return $resources[$created - 1]; + }); + + $this->groupObject->add($pool); + + try { + $this->groupObject->use(['pool1', 'missing'], function (): void { + }); + $this->fail('Should have thrown'); + } catch (Exception) { + // expected + } + + $this->assertSame(1, $pool->count()); + + $pool->use(function (\Stringable $resource) use (&$resources): void { + $this->assertSame($resources[0], $resource); + $this->assertSame('resource-1', (string) $resource); + }); + $this->assertSame(1, $created); + }); + } + + public function testGroupUseRecordsUseDurationTelemetry(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $telemetry = new TestTelemetry(); + + $this->groupObject + ->add(new Pool($this->getAdapter(), 'pool1', 1, fn () => '1')) + ->setTelemetry($telemetry); + + $this->assertArrayNotHasKey('pool.connection.use_time', $telemetry->histograms); + + $this->groupObject->use(['pool1'], function (...$resources): void { + $this->assertSame(['1'], $resources); + }); + + $this->assertArrayHasKey('pool.connection.use_time', $telemetry->histograms); + /** @var object{values: array} $useHistogram */ + $useHistogram = $telemetry->histograms['pool.connection.use_time']; + $this->assertCount(1, $useHistogram->values); + }); + } + + public function testGroupUseReleasesEveryConnectionWhenCleanupThrows(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + + $pool1 = new class ($this->getAdapter(), 'pool1', 1, fn () => '1') extends Pool { + public bool $released = false; + + public function release(Connection $connection, bool $failed = false, ?float $start = null): static + { + $this->released = true; + return parent::release($connection, $failed, $start); + } + }; + $pool2 = new class ($this->getAdapter(), 'pool2', 1, fn () => '2') extends Pool { + public bool $released = false; + + public function release(Connection $connection, bool $failed = false, ?float $start = null): static + { + $this->released = true; + throw new \RuntimeException('Release failed'); + } + }; + + $this->groupObject + ->add($pool1) + ->add($pool2); + + $error = null; + try { + $this->groupObject->use(['pool1', 'pool2'], function (...$resources): void { + $this->assertSame(['1', '2'], $resources); + }); + } catch (\RuntimeException $error) { + } + + $this->assertInstanceOf(\RuntimeException::class, $error); + $this->assertSame('Release failed', $error->getMessage()); + $this->assertTrue($pool1->released); + $this->assertTrue($pool2->released); + $this->assertSame(1, $pool1->count()); + }); + } } diff --git a/tests/Pools/Scopes/PoolTestScope.php b/tests/Pools/Scopes/PoolTestScope.php index d9b3350..532ffef 100644 --- a/tests/Pools/Scopes/PoolTestScope.php +++ b/tests/Pools/Scopes/PoolTestScope.php @@ -19,7 +19,7 @@ abstract protected function execute(callable $callback): mixed; protected function setUpPool(): void { - $this->poolObject = new Pool($this->getAdapter(), 'test', 5, fn() => 'x'); + $this->poolObject = new Pool($this->getAdapter(), 'test', 5, fn () => 'x'); } public function testPoolGetName(): void @@ -364,23 +364,208 @@ public function testPoolEmptyErrorIncludesActiveCount(): void }); } - public function testUseReclainsConnectionOnCallbackException(): void + public function testUseDestroysConnectionWhenRecoveryFails(): void { $this->execute(function (): void { - $this->setUpPool(); // size 5 + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-destroy-on-error', 2, function () use (&$created) { + $created++; + return new class ('resource-' . $created, $created === 1) implements \Stringable { + public function __construct(private string $name, private bool $failRecovery) + { + } + + public function __toString(): string + { + return $this->name; + } + + public function reconnect(): void + { + if ($this->failRecovery) { + throw new \RuntimeException('Recovery failed'); + } + } + }; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); - // use() should reclaim the connection even when callback throws try { - $this->poolObject->use(function ($resource): void { - $this->assertSame(4, $this->poolObject->count()); + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-1', (string) $resource); throw new \RuntimeException('Callback failed'); }); } catch (\RuntimeException) { // expected } - // Connection should be reclaimed, pool back to full - $this->assertSame(5, $this->poolObject->count()); + $this->assertSame(2, $pool->count()); + + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-2', (string) $resource); + }); + }); + } + + public function testUseDestroysConnectionWhenRecoveryReturnsFalse(): void + { + $this->execute(function (): void { + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-destroy-on-false-recovery', 2, function () use (&$created) { + $created++; + return new class ('resource-' . $created) implements \Stringable { + public function __construct(private string $name) + { + } + + public function __toString(): string + { + return $this->name; + } + + public function reconnect(): bool + { + return false; + } + }; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); + + try { + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-1', (string) $resource); + throw new \RuntimeException('Callback failed'); + }); + } catch (\RuntimeException) { + // expected + } + + $this->assertSame(2, $pool->count()); + + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-2', (string) $resource); + }); + }); + } + + public function testUseDestroysObjectConnectionWithoutRecoveryHooks(): void + { + $this->execute(function (): void { + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-destroy-without-recovery', 2, function () use (&$created) { + $created++; + return new class ('resource-' . $created) implements \Stringable { + public function __construct(private string $name) + { + } + + public function __toString(): string + { + return $this->name; + } + }; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); + + try { + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-1', (string) $resource); + throw new \RuntimeException('Callback failed'); + }); + } catch (\RuntimeException) { + // expected + } + + $this->assertSame(2, $pool->count()); + + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-2', (string) $resource); + }); + }); + } + + public function testUseDestroysNativeResourceConnectionAfterCallbackFailure(): void + { + $this->execute(function (): void { + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-destroy-native-resource', 2, function () use (&$created) { + $created++; + $resource = fopen('php://temp', 'r+'); + if ($resource === false) { + throw new \RuntimeException('Failed to open stream'); + } + + fwrite($resource, 'resource-' . $created); + rewind($resource); + + return $resource; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); + + try { + $pool->use(function ($resource): void { + $this->assertIsResource($resource); + $this->assertSame('resource-1', stream_get_contents($resource)); + throw new \RuntimeException('Callback failed'); + }); + } catch (\RuntimeException) { + // expected + } + + $this->assertSame(2, $pool->count()); + + $pool->use(function ($resource): void { + $this->assertIsResource($resource); + $this->assertSame('resource-2', stream_get_contents($resource)); + }); + }); + } + + public function testUsePreservesCallbackExceptionWhenReplacementFails(): void + { + $this->execute(function (): void { + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-preserve-callback-error', 1, function () use (&$created) { + $created++; + if ($created > 1) { + throw new \TypeError('Replacement failed'); + } + + return new class ('resource-' . $created) implements \Stringable { + public function __construct(private string $name) + { + } + + public function __toString(): string + { + return $this->name; + } + + public function reconnect(): void + { + throw new \RuntimeException('Recovery failed'); + } + }; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); + + $error = null; + try { + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-1', (string) $resource); + throw new \LogicException('Callback failed'); + }); + } catch (\LogicException $error) { + } + + $this->assertInstanceOf(\LogicException::class, $error); + $this->assertSame('Callback failed', $error->getMessage()); + $this->assertSame(1, $pool->count()); }); }