From 6b1a22b6662f78aab5b51fd41bfcb4576c831c65 Mon Sep 17 00:00:00 2001 From: marcopiraccini Date: Tue, 19 May 2026 11:26:36 +0200 Subject: [PATCH 1/2] feat: add coordinatorPlugin with auto-register reply-from + helper decorators Consumers no longer need a separate `npm i @fastify/reply-from` step plus a manual `app.register(replyFrom)` and `new Registry(...)` to use the Fastify helpers. `app.register(coordinatorPlugin, { redis, keyPrefix, ... })` now: - Registers @fastify/reply-from (idempotently, via hasReplyDecorator check) - Constructs a Registry from the passed options (or accepts an existing one via `registry`, in which case the plugin will not close it) - Decorates the Fastify instance with `coordinator` (renamable via `decorateAs`), exposing: - `app.coordinator.registry` (the underlying Registry) - `app.coordinator.lookupAndProxy(opts)` - `app.coordinator.lookupLockAndProxy(opts)` - `app.coordinator.pickAndRegister(opts)` - `app.coordinator.lookupAndDeregister(opts)` - `app.coordinator.proxyVia(resolve, opts)` - Closes the registry on `app.close()` (only when the plugin created it) The existing standalone helper imports (`lookupAndProxy`, etc.) keep working; they remain documented as the advanced/manual path. Also: - Adds Postgres to docker-compose.yml (host port 15432) so test:deps:up brings up both Redis and Postgres in one step. - Updates the storage-db example to register the new coordinatorPlugin instead of doing reply-from + Registry wiring by hand. - Updates e2e test defaults to match the new compose ports (REDIS_URL=:6390, PG_URL=:15432). CI workflow updated accordingly. - Renames script aliases to test:deps:up / test:deps:down; drops the now-redundant test:redis:* aliases. Co-Authored-By: Claude Opus 4.7 --- .github/workflows/ci.yml | 6 +- README.md | 104 ++++++++++-- docker-compose.yml | 15 ++ examples/storage-db/package.json | 1 - examples/storage-db/src/bin/coordinator.ts | 4 +- examples/storage-db/src/coordinator-plugin.ts | 40 ++--- package.json | 5 +- pnpm-lock.yaml | 6 +- src/index.ts | 3 + src/plugin.ts | 96 +++++++++++ test/e2e/storage-db.test.ts | 4 +- test/plugin.test.ts | 159 ++++++++++++++++++ 12 files changed, 395 insertions(+), 48 deletions(-) create mode 100644 src/plugin.ts create mode 100644 test/plugin.test.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 03af4fe..f530c2e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -54,8 +54,10 @@ jobs: POSTGRES_USER: storage POSTGRES_PASSWORD: storage POSTGRES_DB: storage + # Map to host 15432 (matches docker-compose.yml so local devs avoid + # clashing with any other Postgres on 5432). ports: - - 5432:5432 + - 15432:5432 options: >- --health-cmd "pg_isready -U storage -d storage" --health-interval 10s @@ -66,7 +68,7 @@ jobs: node-version: [22, 24, 26] env: REDIS_URL: redis://127.0.0.1:6390 - PG_URL: postgresql://storage:storage@127.0.0.1:5432/storage + PG_URL: postgresql://storage:storage@127.0.0.1:15432/storage steps: - uses: actions/checkout@v4 - uses: pnpm/action-setup@v4 diff --git a/README.md b/README.md index 651ec11..d17a7d5 100644 --- a/README.md +++ b/README.md @@ -24,13 +24,69 @@ This library handles all of that: npm install @platformatic/coordinator ``` -For the Fastify helpers, also: +Peer dependency: `fastify >= 5` when using the Fastify plugin or helpers. `@fastify/reply-from` is a runtime dependency of this package — you don't need to install it separately. -```sh -npm install @fastify/reply-from +## Quick start (Fastify plugin) + +```ts +import Fastify from 'fastify' +import coordinatorPlugin from '@platformatic/coordinator' + +const app = Fastify() + +await app.register(coordinatorPlugin, { + redis: 'redis://valkey:6379', + keyPrefix: 'myservice', + strategy: 'least-loaded' +}) + +app.get('/destinations/:id/work', app.coordinator.lookupAndProxy({ + destinationFrom: req => req.params.id, + claimOnMiss: true, + reassignOrphans: true +})) + +app.post('/destinations', app.coordinator.pickAndRegister({ + registerIdFrom: res => res.id +})) + +app.delete('/destinations/:id', app.coordinator.lookupAndDeregister({ + destinationFrom: req => req.params.id +})) + +app.post('/transactions/:lockId/work', app.coordinator.lookupLockAndProxy({ + lockFrom: req => req.params.lockId +})) + +await app.listen({ port: 3000 }) ``` -Peer dependency: `fastify >= 5` when using the Fastify helpers. +The plugin: + +- Registers `@fastify/reply-from` (idempotently — skipped if already registered) +- Constructs a `Registry` from the passed options (or reuses one you provide via `registry`) +- Exposes both the registry and the route-handler-factory helpers on `app.coordinator` +- Closes the registry on `app.close()` (unless you brought your own) + +Options: + +```ts +interface CoordinatorPluginOptions { + // Forwarded to new Registry(...) when no `registry` is supplied: + redis?: string // redis/valkey URL + keyPrefix?: string // default 'coordinator' + strategy?: 'round-robin' | 'least-loaded' | 'random' | AllocationStrategy + cache?: { ttl?: number, max?: number } | false + requestTimeout?: number + + registry?: Registry // reuse an existing Registry (plugin will not close it) + decorateAs?: string // default 'coordinator' + replyFrom?: FastifyReplyFromOptions // forwarded to @fastify/reply-from + registerReplyFrom?: boolean // default true; set false if you already registered reply-from +} +``` + +The legacy standalone helper imports (`import { lookupAndProxy } from '@platformatic/coordinator'`) still work and are documented below. They require manually registering `@fastify/reply-from` and constructing the `Registry`. ## Valkey layout @@ -148,9 +204,22 @@ Built-in least-loaded reads `load` from each candidate's member record (`HGET` p `resolveDestination` checks a local LRU+TTL cache before reading Valkey. Default 5 s TTL, 10 000 entries. Configure with `cache: { ttl, max }` or disable with `cache: false`. Writes through the registry (`addPodToDestination`, `deregisterDestination`) evict the affected key. Each replica has its own cache. -## Fastify helpers +## Fastify helpers (standalone, advanced) + +The helpers used internally by `app.coordinator.*` are also exported as standalone functions, for users who want to manage their own `Registry` and reply-from registration. Each emits a tagged result via an optional `onResult` callback so presets can hook their own metric counters. -For HTTP-based coordinators, three helpers wrap the common patterns. Each emits a tagged result via an optional `onResult` callback so presets can hook their own metric counters. +Before mounting any helper-backed route you must register `@fastify/reply-from` (the `coordinatorPlugin` does this for you): + +```ts +import Fastify from 'fastify' +import replyFrom from '@fastify/reply-from' +import { Registry, lookupAndProxy } from '@platformatic/coordinator' + +const app = Fastify() +await app.register(replyFrom) +const registry = new Registry({ redis, keyPrefix }) +app.get('/x/:id', lookupAndProxy(registry, { destinationFrom: r => r.params.id })) +``` ### `lookupAndProxy` @@ -184,19 +253,30 @@ app.delete('/destinations/:id', lookupAndDeregister(registry, { Resolves, proxies the delete; on `expectedStatus` (204 by default), `DEL`s the destination set. If the destination has only dead pods, skips the proxy and just deletes the set ("deregistered_dead_pod"). -All three helpers go through `@fastify/reply-from`, which the host application must register once before any helper-backed route is mounted. +All four helpers go through `@fastify/reply-from`. The `coordinatorPlugin` registers it automatically; if you use the standalone helpers, you must register it yourself. + +### `lookupLockAndProxy` + +```ts +app.post('/transactions/:lockId/work', lookupLockAndProxy(registry, { + lockFrom: req => req.params.lockId +})) +``` + +Resolves the lockId to the pod that owns it (via `Registry.resolveLock`) and proxies through. 404s if the lockId is unknown. ## Testing -Tests use Redis on `127.0.0.1:6390`. A `docker-compose.yml` is included. +Unit tests need a Redis on `127.0.0.1:6390`. E2E tests also need a Postgres on `127.0.0.1:15432` (storage/storage/storage). Both are in the included `docker-compose.yml`. ```sh -pnpm run test:redis:up -pnpm test -pnpm run test:redis:down +pnpm run test:deps:up # brings up redis + postgres +pnpm test # unit tests +pnpm run test:e2e # end-to-end tests (uses the storage-db example) +pnpm run test:deps:down ``` -The URL is read from `REDIS_URL` (default `redis://127.0.0.1:6390`). Tests isolate keys with a random prefix and clean up after themselves. +URLs are read from `REDIS_URL` (default `redis://127.0.0.1:6390`) and `PG_URL` (default `postgresql://storage:storage@127.0.0.1:15432/storage`). Tests isolate keys with a random prefix and clean up after themselves. ## License diff --git a/docker-compose.yml b/docker-compose.yml index 4a31a16..556527f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,3 +9,18 @@ services: interval: 2s timeout: 2s retries: 10 + + postgres: + image: postgres:17-alpine + container_name: coordinator-test-postgres + ports: + - "15432:5432" + environment: + POSTGRES_USER: storage + POSTGRES_PASSWORD: storage + POSTGRES_DB: storage + healthcheck: + test: ["CMD-SHELL", "pg_isready -U storage -d storage"] + interval: 2s + timeout: 2s + retries: 10 diff --git a/examples/storage-db/package.json b/examples/storage-db/package.json index 04a132a..3fe64a1 100644 --- a/examples/storage-db/package.json +++ b/examples/storage-db/package.json @@ -15,7 +15,6 @@ "smoke": "./scripts/smoke.sh" }, "dependencies": { - "@fastify/reply-from": "^12.6.2", "@platformatic/coordinator": "workspace:*", "fastify": "^5.3.2", "fastify-plugin": "^5.0.1", diff --git a/examples/storage-db/src/bin/coordinator.ts b/examples/storage-db/src/bin/coordinator.ts index 7d7800e..1b83cec 100644 --- a/examples/storage-db/src/bin/coordinator.ts +++ b/examples/storage-db/src/bin/coordinator.ts @@ -1,6 +1,6 @@ import Fastify from 'fastify' import { Registry } from '@platformatic/coordinator' -import { coordinatorPlugin } from '../coordinator-plugin.ts' +import { storageDbCoordinatorPlugin } from '../coordinator-plugin.ts' const env = (key: string, fallback?: string): string => { const v = process.env[key] ?? fallback @@ -23,7 +23,7 @@ const registry = new Registry({ }) const app = Fastify({ logger: { level: process.env.LOG_LEVEL ?? 'info' } }) -await app.register(coordinatorPlugin, { registry }) +await app.register(storageDbCoordinatorPlugin, { registry }) const shutdown = async (): Promise => { try { await registry.close() } catch { /* ignore */ } diff --git a/examples/storage-db/src/coordinator-plugin.ts b/examples/storage-db/src/coordinator-plugin.ts index 1ab9b58..065c80b 100644 --- a/examples/storage-db/src/coordinator-plugin.ts +++ b/examples/storage-db/src/coordinator-plugin.ts @@ -1,13 +1,6 @@ import fp from 'fastify-plugin' -import replyFrom from '@fastify/reply-from' import type { FastifyInstance, FastifyRequest } from 'fastify' -import { - Registry, - lookupAndProxy, - pickAndRegister, - lookupAndDeregister, - lookupLockAndProxy -} from '@platformatic/coordinator' +import { type Registry, coordinatorPlugin } from '@platformatic/coordinator' interface TenantParams { tenantId: string } interface LockParams { lockId: string } @@ -54,45 +47,44 @@ const lockKeySchema = { } } as const -async function coordinatorRoutes (app: FastifyInstance, opts: CoordinatorOptions): Promise { - const { registry } = opts - await app.register(replyFrom) +async function storageDbRoutes (app: FastifyInstance, opts: CoordinatorOptions): Promise { + await app.register(coordinatorPlugin, { registry: opts.registry }) const tenantFrom = (req: FastifyRequest): string => (req.params as TenantParams).tenantId + const lockFrom = (req: FastifyRequest): string => (req.params as LockParams).lockId app.get('/pods', async () => { - const members = await registry.listLiveMembers() + const members = await app.coordinator.registry.listLiveMembers() return { count: members.length, members } }) - app.post('/tenants/:tenantId', { schema: tenantSchema }, pickAndRegister(registry, { + app.post('/tenants/:tenantId', { schema: tenantSchema }, app.coordinator.pickAndRegister({ registerIdFrom: (body: any) => body.tenantId, expectedStatus: 201, unavailableMessage: 'no pods available' })) - const proxyOpts = { + const tenantProxy = app.coordinator.lookupAndProxy({ destinationFrom: tenantFrom, reassignOrphans: true, notFoundMessage: 'tenant not found' - } + }) - app.get('/tenants/:tenantId/keys', { schema: tenantSchema }, lookupAndProxy(registry, proxyOpts)) - app.get('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, lookupAndProxy(registry, proxyOpts)) - app.put('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, lookupAndProxy(registry, proxyOpts)) - app.delete('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, lookupAndProxy(registry, proxyOpts)) + app.get('/tenants/:tenantId/keys', { schema: tenantSchema }, tenantProxy) + app.get('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, tenantProxy) + app.put('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, tenantProxy) + app.delete('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, tenantProxy) - app.delete('/tenants/:tenantId', { schema: tenantSchema }, lookupAndDeregister(registry, { + app.delete('/tenants/:tenantId', { schema: tenantSchema }, app.coordinator.lookupAndDeregister({ destinationFrom: tenantFrom, notFoundMessage: 'tenant not found' })) app.post('/tenants/:tenantId/transactions', { schema: tenantSchema }, - lookupAndProxy(registry, proxyOpts)) + tenantProxy) - const lockFrom = (req: FastifyRequest): string => (req.params as LockParams).lockId - const lockProxy = lookupLockAndProxy(registry, { + const lockProxy = app.coordinator.lookupLockAndProxy({ lockFrom, notFoundMessage: 'transaction not found' }) @@ -103,4 +95,4 @@ async function coordinatorRoutes (app: FastifyInstance, opts: CoordinatorOptions app.post('/transactions/:lockId/rollback', { schema: lockSchema }, lockProxy) } -export const coordinatorPlugin = fp(coordinatorRoutes, { name: 'storage-db-coordinator' }) +export const storageDbCoordinatorPlugin = fp(storageDbRoutes, { name: 'storage-db-coordinator' }) diff --git a/package.json b/package.json index 1e3d368..5a97f8c 100644 --- a/package.json +++ b/package.json @@ -25,12 +25,13 @@ "build": "tsc", "test": "node --test --test-reporter=cleaner-spec-reporter test/*.test.ts", "test:e2e": "pnpm --filter @platformatic/coordinator run build && node --test --test-reporter=cleaner-spec-reporter --test-timeout=60000 test/e2e/*.test.ts", - "test:redis:up": "docker compose up -d --wait", - "test:redis:down": "docker compose down -v", + "test:deps:up": "docker compose up -d --wait", + "test:deps:down": "docker compose down -v", "lint": "eslint --cache" }, "dependencies": { "@fastify/reply-from": "^12.6.2", + "fastify-plugin": "^5.1.0", "iovalkey": "^0.3.3", "undici": "^7.23.0" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7705b48..66eb5fa 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -11,6 +11,9 @@ importers: '@fastify/reply-from': specifier: ^12.6.2 version: 12.6.2 + fastify-plugin: + specifier: ^5.1.0 + version: 5.1.0 iovalkey: specifier: ^0.3.3 version: 0.3.3 @@ -42,9 +45,6 @@ importers: examples/storage-db: dependencies: - '@fastify/reply-from': - specifier: ^12.6.2 - version: 12.6.2 '@platformatic/coordinator': specifier: workspace:* version: link:../.. diff --git a/src/index.ts b/src/index.ts index 9b52f32..b6a4cdf 100644 --- a/src/index.ts +++ b/src/index.ts @@ -32,3 +32,6 @@ export type { LookupLockAndProxyOptions, LookupLockAndProxyResult } from './help export { proxyVia } from './helpers/proxy-via.ts' export type { ProxyViaOptions, ProxyResolver, ProxyTarget } from './helpers/proxy-via.ts' + +export { default as coordinatorPlugin } from './plugin.ts' +export type { CoordinatorPluginOptions } from './plugin.ts' diff --git a/src/plugin.ts b/src/plugin.ts new file mode 100644 index 0000000..735aeb3 --- /dev/null +++ b/src/plugin.ts @@ -0,0 +1,96 @@ +import type { FastifyPluginAsync, RouteHandlerMethod } from 'fastify' +import fp from 'fastify-plugin' +import replyFrom, { type FastifyReplyFromOptions } from '@fastify/reply-from' +import { Registry, type RegistryOptions } from './registry.ts' +import { lookupAndProxy, type LookupAndProxyOptions } from './helpers/lookup-and-proxy.ts' +import { lookupLockAndProxy, type LookupLockAndProxyOptions } from './helpers/lookup-lock-and-proxy.ts' +import { pickAndRegister, type PickAndRegisterOptions } from './helpers/pick-and-register.ts' +import { lookupAndDeregister, type LookupAndDeregisterOptions } from './helpers/lookup-and-deregister.ts' +import { proxyVia, type ProxyResolver, type ProxyTarget, type ProxyViaOptions } from './helpers/proxy-via.ts' + +export interface Coordinator { + /** Underlying Registry instance; use it for non-Fastify operations (listLiveMembers, etc.). */ + registry: Registry + lookupAndProxy: (opts: LookupAndProxyOptions) => RouteHandlerMethod + lookupLockAndProxy: (opts: LookupLockAndProxyOptions) => RouteHandlerMethod + pickAndRegister: (opts: PickAndRegisterOptions) => RouteHandlerMethod + lookupAndDeregister: (opts: LookupAndDeregisterOptions) => RouteHandlerMethod + proxyVia: (resolve: ProxyResolver, opts?: ProxyViaOptions) => RouteHandlerMethod +} + +export interface CoordinatorPluginOptions extends Partial { + /** + * Reuse an existing Registry instance instead of creating one from the + * remaining options. When provided, the plugin does NOT close it on shutdown + * (lifecycle stays with the caller). + * + * When omitted, `redis` is required. + */ + registry?: Registry + /** + * Forwarded to `@fastify/reply-from` registration. + */ + replyFrom?: FastifyReplyFromOptions + /** + * Name of the decorator that exposes the Coordinator on the Fastify instance. + * Defaults to `coordinator`. + */ + decorateAs?: string + /** + * Set to `false` if the host app has already registered `@fastify/reply-from` + * (or a compatible plugin providing `reply.from`). Defaults to `true`. + */ + registerReplyFrom?: boolean +} + +declare module 'fastify' { + interface FastifyInstance { + coordinator: Coordinator + } +} + +const plugin: FastifyPluginAsync = async (app, opts) => { + const { + registry: externalRegistry, + replyFrom: replyFromOpts, + decorateAs = 'coordinator', + registerReplyFrom = true, + ...registryOpts + } = opts + + if (registerReplyFrom && !app.hasReplyDecorator('from')) { + if (replyFromOpts) { + await app.register(replyFrom, replyFromOpts) + } else { + await app.register(replyFrom) + } + } + + const ownsRegistry = !externalRegistry + if (!externalRegistry && !registryOpts.redis) { + throw new Error('coordinatorPlugin requires either `redis` or `registry`') + } + const registry = externalRegistry ?? new Registry(registryOpts as RegistryOptions) + + const coordinator: Coordinator = { + registry, + lookupAndProxy: (o) => lookupAndProxy(registry, o), + lookupLockAndProxy: (o) => lookupLockAndProxy(registry, o), + pickAndRegister: (o) => pickAndRegister(registry, o), + lookupAndDeregister: (o) => lookupAndDeregister(registry, o), + proxyVia: (resolve, o) => proxyVia(resolve, o) + } + + app.decorate(decorateAs, coordinator) + + if (ownsRegistry) { + app.addHook('onClose', async () => { await registry.close() }) + } +} + +export default fp(plugin, { + name: '@platformatic/coordinator', + fastify: '5.x' +}) + +export { plugin as coordinatorPlugin } diff --git a/test/e2e/storage-db.test.ts b/test/e2e/storage-db.test.ts index e45a63a..3e538c3 100644 --- a/test/e2e/storage-db.test.ts +++ b/test/e2e/storage-db.test.ts @@ -8,8 +8,8 @@ import { fileURLToPath } from 'node:url' import { Redis } from 'iovalkey' import pg from 'pg' -const REDIS_URL = process.env.REDIS_URL ?? 'redis://127.0.0.1:6379' -const PG_URL = process.env.PG_URL ?? 'postgresql://storage:storage@127.0.0.1:5432/storage' +const REDIS_URL = process.env.REDIS_URL ?? 'redis://127.0.0.1:6390' +const PG_URL = process.env.PG_URL ?? 'postgresql://storage:storage@127.0.0.1:15432/storage' const KEY_PREFIX = `e2e-${randomBytes(4).toString('hex')}` const COORDINATOR_PORT = 18080 diff --git a/test/plugin.test.ts b/test/plugin.test.ts new file mode 100644 index 0000000..4ed843a --- /dev/null +++ b/test/plugin.test.ts @@ -0,0 +1,159 @@ +import { strictEqual, ok } from 'node:assert' +import { randomBytes } from 'node:crypto' +import test from 'node:test' +import Fastify from 'fastify' +import { Redis } from 'iovalkey' +import coordinatorPlugin from '../src/plugin.ts' +import { Registry } from '../src/registry.ts' +import { REDIS_URL } from './redis-url.ts' + +const PREFIX = `test-plugin-${randomBytes(4).toString('hex')}` + +const membersKey = (): string => `${PREFIX}:members` +const memberKey = (id: string): string => `${PREFIX}:member:${id}` +const destinationKey = (id: string): string => `${PREFIX}:destination:${id}` +const lockKey = (id: string): string => `${PREFIX}:lock:${id}` + +async function makeLivePod (redis: Redis, memberId: string, address: string): Promise { + await redis.sadd(membersKey(), memberId) + await redis.hset(memberKey(memberId), { address, load: '0' }) + await redis.expire(memberKey(memberId), 60) +} + +interface MockPod { app: ReturnType, address: string } + +async function createMockPod (): Promise { + const app = Fastify() + + app.get('/items/:id', async (req) => ({ id: (req.params as any).id, served: true })) + app.post('/resources', async (req, reply) => { + return reply.code(201).send({ resourceId: `r-${randomBytes(3).toString('hex')}` }) + }) + app.delete('/items/:id', async (req, reply) => reply.code(204).send()) + app.post('/transactions/:lockId/echo', async (req) => ({ lockId: (req.params as any).lockId })) + + await app.listen({ port: 0, host: '127.0.0.1' }) + const addr = app.server.address() as any + return { app, address: `http://127.0.0.1:${addr.port}` } +} + +test('coordinatorPlugin', async (t) => { + const redis = new Redis(REDIS_URL) + const pod = await createMockPod() + await makeLivePod(redis, 'pod-1', pod.address) + + t.after(async () => { + await pod.app.close() + const stream = redis.scanStream({ match: `${PREFIX}:*`, count: 100 }) + for await (const keys of stream) { + if (keys.length > 0) await redis.del(...keys) + } + await redis.quit() + }) + + await t.test('registers reply-from and decorates the Coordinator', async () => { + const app = Fastify() + await app.register(coordinatorPlugin, { redis: REDIS_URL, keyPrefix: PREFIX, cache: false }) + + ok(app.hasReplyDecorator('from'), 'reply.from should be available') + ok(app.coordinator.registry instanceof Registry, 'app.coordinator.registry should be a Registry') + ok(typeof app.coordinator.lookupAndProxy === 'function') + ok(typeof app.coordinator.lookupLockAndProxy === 'function') + ok(typeof app.coordinator.pickAndRegister === 'function') + ok(typeof app.coordinator.lookupAndDeregister === 'function') + ok(typeof app.coordinator.proxyVia === 'function') + + await app.close() + }) + + await t.test('lookupAndProxy via decorator routes a request', async () => { + const app = Fastify() + await app.register(coordinatorPlugin, { redis: REDIS_URL, keyPrefix: PREFIX, cache: false }) + + app.get('/items/:id', app.coordinator.lookupAndProxy({ + destinationFrom: (req) => (req.params as any).id, + reassignOrphans: true, + claimOnMiss: true + })) + + const res = await app.inject({ method: 'GET', url: '/items/abc' }) + strictEqual(res.statusCode, 200) + const body = res.json() as any + strictEqual(body.id, 'abc') + strictEqual(body.served, true) + + const set = await redis.smembers(destinationKey('abc')) + strictEqual(set.length, 1) + strictEqual(set[0], 'pod-1') + + await app.close() + }) + + await t.test('pickAndRegister via decorator binds to a pod', async () => { + const app = Fastify() + await app.register(coordinatorPlugin, { redis: REDIS_URL, keyPrefix: PREFIX, cache: false }) + + app.post('/resources', app.coordinator.pickAndRegister({ + registerIdFrom: (res: any) => res.resourceId + })) + + const res = await app.inject({ method: 'POST', url: '/resources' }) + strictEqual(res.statusCode, 201) + const body = res.json() as any + ok(body.resourceId) + const set = await redis.smembers(destinationKey(body.resourceId)) + strictEqual(set.length, 1) + + await app.close() + }) + + await t.test('lookupLockAndProxy via decorator routes by lockId', async () => { + const lockId = `lock-${randomBytes(3).toString('hex')}` + await redis.hset(lockKey(lockId), { podId: 'pod-1', destinationId: 'tenant-x' }) + + const app = Fastify() + await app.register(coordinatorPlugin, { redis: REDIS_URL, keyPrefix: PREFIX, cache: false }) + + app.post('/transactions/:lockId/echo', app.coordinator.lookupLockAndProxy({ + lockFrom: (req) => (req.params as any).lockId + })) + + const res = await app.inject({ method: 'POST', url: `/transactions/${lockId}/echo` }) + strictEqual(res.statusCode, 200) + strictEqual((res.json() as any).lockId, lockId) + + await app.close() + }) + + await t.test('accepts an existing Registry and does not close it', async () => { + const registry = new Registry({ redis: REDIS_URL, keyPrefix: PREFIX, cache: false }) + const app = Fastify() + await app.register(coordinatorPlugin, { registry }) + + strictEqual(app.coordinator.registry, registry) + await app.close() + + const live = await registry.listLiveMembers() + ok(live.length >= 1) + await registry.close() + }) + + await t.test('decorateAs renames the decorator', async () => { + const app = Fastify() + await app.register(coordinatorPlugin, { + redis: REDIS_URL, keyPrefix: PREFIX, cache: false, decorateAs: 'router' + }) + ok((app as any).router?.registry instanceof Registry) + strictEqual((app as any).coordinator, undefined) + await app.close() + }) + + await t.test('registerReplyFrom=false skips reply-from registration', async () => { + const registry = new Registry({ redis: REDIS_URL, keyPrefix: PREFIX, cache: false }) + const app = Fastify() + await app.register(coordinatorPlugin, { registry, registerReplyFrom: false }) + strictEqual(app.hasReplyDecorator('from'), false) + await app.close() + await registry.close() + }) +}) From 290d8c8d3689d9695955c86b1884bbd190eb7d35 Mon Sep 17 00:00:00 2001 From: marcopiraccini Date: Tue, 19 May 2026 11:32:31 +0200 Subject: [PATCH 2/2] ci: docker-smoke waits for 3 live pods, not just the coordinator port The previous wait step only checked that /pods returned 200, which happens immediately after the coordinator boots regardless of pod registration. On a slow runner the smoke script could then race ahead and see count=0, returning 503 on the first POST /tenants/. Now we poll until count >= 3. Co-Authored-By: Claude Opus 4.7 --- .github/workflows/ci.yml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f530c2e..1edfc0c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,16 +88,17 @@ jobs: - name: Bring up storage-db stack working-directory: examples/storage-db run: docker compose up --build -d --wait - - name: Wait for coordinator to be ready + - name: Wait for coordinator + 3 live pods run: | - for i in $(seq 1 30); do - if curl -sf http://127.0.0.1:8080/pods > /dev/null; then - echo "coordinator ready after ${i}s" + for i in $(seq 1 60); do + count=$(curl -sf http://127.0.0.1:8080/pods 2>/dev/null | jq -r '.count // 0') + if [ "${count:-0}" -ge 3 ]; then + echo "coordinator + 3 pods ready after ${i}s" exit 0 fi sleep 1 done - echo "coordinator did not become ready in 30s" + echo "coordinator + pods did not become ready in 60s (count=${count:-?})" docker compose -f examples/storage-db/docker-compose.yml logs exit 1 - name: Run smoke script