Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ jobs:
POSTGRES_USER: storage
POSTGRES_PASSWORD: storage
POSTGRES_DB: storage
# Map to host 15432 (matches docker-compose.yml so local devs avoid
# clashing with any other Postgres on 5432).
ports:
- 5432:5432
- 15432:5432
options: >-
--health-cmd "pg_isready -U storage -d storage"
--health-interval 10s
Expand All @@ -66,7 +68,7 @@ jobs:
node-version: [22, 24, 26]
env:
REDIS_URL: redis://127.0.0.1:6390
PG_URL: postgresql://storage:storage@127.0.0.1:5432/storage
PG_URL: postgresql://storage:storage@127.0.0.1:15432/storage
steps:
- uses: actions/checkout@v4
- uses: pnpm/action-setup@v4
Expand All @@ -86,16 +88,17 @@ jobs:
- name: Bring up storage-db stack
working-directory: examples/storage-db
run: docker compose up --build -d --wait
- name: Wait for coordinator to be ready
- name: Wait for coordinator + 3 live pods
run: |
for i in $(seq 1 30); do
if curl -sf http://127.0.0.1:8080/pods > /dev/null; then
echo "coordinator ready after ${i}s"
for i in $(seq 1 60); do
count=$(curl -sf http://127.0.0.1:8080/pods 2>/dev/null | jq -r '.count // 0')
if [ "${count:-0}" -ge 3 ]; then
echo "coordinator + 3 pods ready after ${i}s"
exit 0
fi
sleep 1
done
echo "coordinator did not become ready in 30s"
echo "coordinator + pods did not become ready in 60s (count=${count:-?})"
docker compose -f examples/storage-db/docker-compose.yml logs
exit 1
- name: Run smoke script
Expand Down
104 changes: 92 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,69 @@ This library handles all of that:
npm install @platformatic/coordinator
```

For the Fastify helpers, also:
Peer dependency: `fastify >= 5` when using the Fastify plugin or helpers. `@fastify/reply-from` is a runtime dependency of this package — you don't need to install it separately.

```sh
npm install @fastify/reply-from
## Quick start (Fastify plugin)

```ts
import Fastify from 'fastify'
import coordinatorPlugin from '@platformatic/coordinator'

const app = Fastify()

await app.register(coordinatorPlugin, {
redis: 'redis://valkey:6379',
keyPrefix: 'myservice',
strategy: 'least-loaded'
})

app.get('/destinations/:id/work', app.coordinator.lookupAndProxy({
destinationFrom: req => req.params.id,
claimOnMiss: true,
reassignOrphans: true
}))

app.post('/destinations', app.coordinator.pickAndRegister({
registerIdFrom: res => res.id
}))

app.delete('/destinations/:id', app.coordinator.lookupAndDeregister({
destinationFrom: req => req.params.id
}))

app.post('/transactions/:lockId/work', app.coordinator.lookupLockAndProxy({
lockFrom: req => req.params.lockId
}))

await app.listen({ port: 3000 })
```

Peer dependency: `fastify >= 5` when using the Fastify helpers.
The plugin:

- Registers `@fastify/reply-from` (idempotently — skipped if already registered)
- Constructs a `Registry` from the passed options (or reuses one you provide via `registry`)
- Exposes both the registry and the route-handler-factory helpers on `app.coordinator`
- Closes the registry on `app.close()` (unless you brought your own)

Options:

```ts
interface CoordinatorPluginOptions {
// Forwarded to new Registry(...) when no `registry` is supplied:
redis?: string // redis/valkey URL
keyPrefix?: string // default 'coordinator'
strategy?: 'round-robin' | 'least-loaded' | 'random' | AllocationStrategy
cache?: { ttl?: number, max?: number } | false
requestTimeout?: number

registry?: Registry // reuse an existing Registry (plugin will not close it)
decorateAs?: string // default 'coordinator'
replyFrom?: FastifyReplyFromOptions // forwarded to @fastify/reply-from
registerReplyFrom?: boolean // default true; set false if you already registered reply-from
}
```

The legacy standalone helper imports (`import { lookupAndProxy } from '@platformatic/coordinator'`) still work and are documented below. They require manually registering `@fastify/reply-from` and constructing the `Registry`.

## Valkey layout

Expand Down Expand Up @@ -148,9 +204,22 @@ Built-in least-loaded reads `load` from each candidate's member record (`HGET` p

`resolveDestination` checks a local LRU+TTL cache before reading Valkey. Default 5 s TTL, 10 000 entries. Configure with `cache: { ttl, max }` or disable with `cache: false`. Writes through the registry (`addPodToDestination`, `deregisterDestination`) evict the affected key. Each replica has its own cache.

## Fastify helpers
## Fastify helpers (standalone, advanced)

The helpers used internally by `app.coordinator.*` are also exported as standalone functions, for users who want to manage their own `Registry` and reply-from registration. Each emits a tagged result via an optional `onResult` callback so presets can hook their own metric counters.

For HTTP-based coordinators, three helpers wrap the common patterns. Each emits a tagged result via an optional `onResult` callback so presets can hook their own metric counters.
Before mounting any helper-backed route you must register `@fastify/reply-from` (the `coordinatorPlugin` does this for you):

```ts
import Fastify from 'fastify'
import replyFrom from '@fastify/reply-from'
import { Registry, lookupAndProxy } from '@platformatic/coordinator'

const app = Fastify()
await app.register(replyFrom)
const registry = new Registry({ redis, keyPrefix })
app.get('/x/:id', lookupAndProxy(registry, { destinationFrom: r => r.params.id }))
```

### `lookupAndProxy`

Expand Down Expand Up @@ -184,19 +253,30 @@ app.delete('/destinations/:id', lookupAndDeregister(registry, {

Resolves, proxies the delete; on `expectedStatus` (204 by default), `DEL`s the destination set. If the destination has only dead pods, skips the proxy and just deletes the set ("deregistered_dead_pod").

All three helpers go through `@fastify/reply-from`, which the host application must register once before any helper-backed route is mounted.
All four helpers go through `@fastify/reply-from`. The `coordinatorPlugin` registers it automatically; if you use the standalone helpers, you must register it yourself.

### `lookupLockAndProxy`

```ts
app.post('/transactions/:lockId/work', lookupLockAndProxy(registry, {
lockFrom: req => req.params.lockId
}))
```

Resolves the lockId to the pod that owns it (via `Registry.resolveLock`) and proxies through. 404s if the lockId is unknown.

## Testing

Tests use Redis on `127.0.0.1:6390`. A `docker-compose.yml` is included.
Unit tests need a Redis on `127.0.0.1:6390`. E2E tests also need a Postgres on `127.0.0.1:15432` (storage/storage/storage). Both are in the included `docker-compose.yml`.

```sh
pnpm run test:redis:up
pnpm test
pnpm run test:redis:down
pnpm run test:deps:up # brings up redis + postgres
pnpm test # unit tests
pnpm run test:e2e # end-to-end tests (uses the storage-db example)
pnpm run test:deps:down
```

The URL is read from `REDIS_URL` (default `redis://127.0.0.1:6390`). Tests isolate keys with a random prefix and clean up after themselves.
URLs are read from `REDIS_URL` (default `redis://127.0.0.1:6390`) and `PG_URL` (default `postgresql://storage:storage@127.0.0.1:15432/storage`). Tests isolate keys with a random prefix and clean up after themselves.

## License

Expand Down
15 changes: 15 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,18 @@ services:
interval: 2s
timeout: 2s
retries: 10

postgres:
image: postgres:17-alpine
container_name: coordinator-test-postgres
ports:
- "15432:5432"
environment:
POSTGRES_USER: storage
POSTGRES_PASSWORD: storage
POSTGRES_DB: storage
healthcheck:
test: ["CMD-SHELL", "pg_isready -U storage -d storage"]
interval: 2s
timeout: 2s
retries: 10
1 change: 0 additions & 1 deletion examples/storage-db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
"smoke": "./scripts/smoke.sh"
},
"dependencies": {
"@fastify/reply-from": "^12.6.2",
"@platformatic/coordinator": "workspace:*",
"fastify": "^5.3.2",
"fastify-plugin": "^5.0.1",
Expand Down
4 changes: 2 additions & 2 deletions examples/storage-db/src/bin/coordinator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Fastify from 'fastify'
import { Registry } from '@platformatic/coordinator'
import { coordinatorPlugin } from '../coordinator-plugin.ts'
import { storageDbCoordinatorPlugin } from '../coordinator-plugin.ts'

const env = (key: string, fallback?: string): string => {
const v = process.env[key] ?? fallback
Expand All @@ -23,7 +23,7 @@ const registry = new Registry({
})

const app = Fastify({ logger: { level: process.env.LOG_LEVEL ?? 'info' } })
await app.register(coordinatorPlugin, { registry })
await app.register(storageDbCoordinatorPlugin, { registry })

const shutdown = async (): Promise<void> => {
try { await registry.close() } catch { /* ignore */ }
Expand Down
40 changes: 16 additions & 24 deletions examples/storage-db/src/coordinator-plugin.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
import fp from 'fastify-plugin'
import replyFrom from '@fastify/reply-from'
import type { FastifyInstance, FastifyRequest } from 'fastify'
import {
Registry,
lookupAndProxy,
pickAndRegister,
lookupAndDeregister,
lookupLockAndProxy
} from '@platformatic/coordinator'
import { type Registry, coordinatorPlugin } from '@platformatic/coordinator'

interface TenantParams { tenantId: string }
interface LockParams { lockId: string }
Expand Down Expand Up @@ -54,45 +47,44 @@ const lockKeySchema = {
}
} as const

async function coordinatorRoutes (app: FastifyInstance, opts: CoordinatorOptions): Promise<void> {
const { registry } = opts
await app.register(replyFrom)
async function storageDbRoutes (app: FastifyInstance, opts: CoordinatorOptions): Promise<void> {
await app.register(coordinatorPlugin, { registry: opts.registry })

const tenantFrom = (req: FastifyRequest): string => (req.params as TenantParams).tenantId
const lockFrom = (req: FastifyRequest): string => (req.params as LockParams).lockId

app.get('/pods', async () => {
const members = await registry.listLiveMembers()
const members = await app.coordinator.registry.listLiveMembers()
return { count: members.length, members }
})

app.post('/tenants/:tenantId', { schema: tenantSchema }, pickAndRegister(registry, {
app.post('/tenants/:tenantId', { schema: tenantSchema }, app.coordinator.pickAndRegister({
registerIdFrom: (body: any) => body.tenantId,
expectedStatus: 201,
unavailableMessage: 'no pods available'
}))

const proxyOpts = {
const tenantProxy = app.coordinator.lookupAndProxy({
destinationFrom: tenantFrom,
reassignOrphans: true,
notFoundMessage: 'tenant not found'
}
})

app.get('/tenants/:tenantId/keys', { schema: tenantSchema }, lookupAndProxy(registry, proxyOpts))
app.get('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, lookupAndProxy(registry, proxyOpts))
app.put('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, lookupAndProxy(registry, proxyOpts))
app.delete('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, lookupAndProxy(registry, proxyOpts))
app.get('/tenants/:tenantId/keys', { schema: tenantSchema }, tenantProxy)
app.get('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, tenantProxy)
app.put('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, tenantProxy)
app.delete('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, tenantProxy)

app.delete('/tenants/:tenantId', { schema: tenantSchema }, lookupAndDeregister(registry, {
app.delete('/tenants/:tenantId', { schema: tenantSchema }, app.coordinator.lookupAndDeregister({
destinationFrom: tenantFrom,
notFoundMessage: 'tenant not found'
}))

app.post('/tenants/:tenantId/transactions',
{ schema: tenantSchema },
lookupAndProxy(registry, proxyOpts))
tenantProxy)

const lockFrom = (req: FastifyRequest): string => (req.params as LockParams).lockId
const lockProxy = lookupLockAndProxy(registry, {
const lockProxy = app.coordinator.lookupLockAndProxy({
lockFrom,
notFoundMessage: 'transaction not found'
})
Expand All @@ -103,4 +95,4 @@ async function coordinatorRoutes (app: FastifyInstance, opts: CoordinatorOptions
app.post('/transactions/:lockId/rollback', { schema: lockSchema }, lockProxy)
}

export const coordinatorPlugin = fp(coordinatorRoutes, { name: 'storage-db-coordinator' })
export const storageDbCoordinatorPlugin = fp(storageDbRoutes, { name: 'storage-db-coordinator' })
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
"build": "tsc",
"test": "node --test --test-reporter=cleaner-spec-reporter test/*.test.ts",
"test:e2e": "pnpm --filter @platformatic/coordinator run build && node --test --test-reporter=cleaner-spec-reporter --test-timeout=60000 test/e2e/*.test.ts",
"test:redis:up": "docker compose up -d --wait",
"test:redis:down": "docker compose down -v",
"test:deps:up": "docker compose up -d --wait",
"test:deps:down": "docker compose down -v",
"lint": "eslint --cache"
},
"dependencies": {
"@fastify/reply-from": "^12.6.2",
"fastify-plugin": "^5.1.0",
"iovalkey": "^0.3.3",
"undici": "^7.23.0"
},
Expand Down
6 changes: 3 additions & 3 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ export type { LookupLockAndProxyOptions, LookupLockAndProxyResult } from './help

export { proxyVia } from './helpers/proxy-via.ts'
export type { ProxyViaOptions, ProxyResolver, ProxyTarget } from './helpers/proxy-via.ts'

export { default as coordinatorPlugin } from './plugin.ts'
export type { CoordinatorPluginOptions } from './plugin.ts'
Loading
Loading