diff --git a/.bounty_pr.json b/.bounty_pr.json new file mode 100644 index 0000000..16c5fc6 --- /dev/null +++ b/.bounty_pr.json @@ -0,0 +1,18 @@ +{ + "status": "ready", + "commit_message": "feat(plugins): add Replicator plugin to mirror external data to internal store", + "pr_title": "feat(plugins): add Replicator plugin to mirror external data", + "pr_body": "## Purpose\n\nCloses #72.\n\nAdds a new `ReplicatorPlugin` under `plugins/replicator/` that pulls rows from an external database (Postgres, MySQL, Cloudflare D1, Turso, or another StarbaseDB) into the StarbaseDB internal SQLite store. Each replication pass is driven by a per-table watermark column (e.g. `updated_at` or a monotonic `id`) so only rows that changed since the previous run are transferred.\n\n## What changed\n\n- `plugins/replicator/index.ts` — new `ReplicatorPlugin` extending `StarbasePlugin`. Creates `tmp_replication_state(table_name, last_value, last_synced_at)` on registration, exposes `sync()` and `POST /replicator/sync` (admin-only), and upserts rows via `INSERT ... ON CONFLICT(primaryKey) DO UPDATE`.\n- Watermark tracking compares **numerically** when both sides parse as numbers, so a monotonic integer `id` column no longer falls into the lexicographic trap (e.g. `\"99\" > \"100\"`). String compare is used otherwise, which still handles ISO timestamps correctly.\n- Identifiers (`name`, `watermarkColumn`, `primaryKey`, `destTable`) are validated against `[A-Za-z_][A-Za-z0-9_]*` at construction time and quoted in the external `SELECT` using dialect-appropriate quoting (backticks for MySQL, double quotes elsewhere) — consistent with the existing double-quoted destination-side identifiers.\n- `plugins/replicator/index.test.ts` — vitest suite covering constructor validation, identifier validation, state-table creation, initial pull, watermark-bounded pulls, dest-table override, MySQL dialect quoting, and numeric-watermark ordering.\n- `plugins/replicator/README.md` — usage, configuration, Cron-plugin scheduling snippet, destination-table DDL template, and a note that large backfills require repeated `sync()` calls because each call is bounded by `batchSize`.\n- `plugins/replicator/meta.json` — registry metadata to match the other plugins.\n\n## How it works\n\n1. On registration the plugin creates `tmp_replication_state(table_name, last_value, last_synced_at)`.\n2. Each `sync()` call reads the stored watermark per table, runs `SELECT * FROM \"\" WHERE \"\" > ? ORDER BY \"\" ASC LIMIT ` against the external source, and upserts each row into the internal store using `ON CONFLICT() DO UPDATE`.\n3. After the batch, the highest watermark seen (numeric or lexicographic depending on the value type) becomes the new stored `last_value`.\n\nScheduling is delegated to the existing [Cron plugin](/plugins/cron/README.md) — the README shows the snippet.\n\n## Tasks\n\n- [x] Implement the plugin\n- [x] Add unit tests\n- [x] Document usage, scheduling, and destination-table bootstrapping\n- [x] Validate identifiers and quote them in the external SELECT\n- [x] Compare numeric watermarks numerically\n\n## Verify\n\n- `npx vitest run plugins/replicator/index.test.ts` — 11/11 passing.\n- The 4 failures in `src/rls/index.test.ts` from a full `npx vitest run` are pre-existing on `main` and unrelated to this change (confirmed by running the RLS suite on `main`).\n\n## Before\n\n- [x] Branch contains exactly one commit, scoped to `plugins/replicator/*`.\n- [x] No edits to unrelated files.\n", + "branch": "fix/issue-72-starbasedb-replicate-data-from-external", + "tests_run": [ + "npx vitest run plugins/replicator/index.test.ts" + ], + "tests_passed": true, + "files_changed": [ + "plugins/replicator/README.md", + "plugins/replicator/index.test.ts", + "plugins/replicator/index.ts", + "plugins/replicator/meta.json" + ], + "notes": "Branch rebased onto main: dropped the 2 unrelated export/dump commits (0034b65, 34b3927) that were on the prior draft, leaving a single replicator commit. Pre-existing failures in src/rls/index.test.ts (4 tests) exist on main and are unrelated to this change." +} diff --git a/plugins/replicator/README.md b/plugins/replicator/README.md new file mode 100644 index 0000000..e7dbb79 --- /dev/null +++ b/plugins/replicator/README.md @@ -0,0 +1,127 @@ +# Replicator Plugin + +The Replicator Plugin pulls rows from an external database (Postgres, MySQL, Cloudflare D1, Turso, or another StarbaseDB) into StarbaseDB's internal SQLite store on demand. Each replication pass uses a per-table watermark column (e.g. `updated_at` or a monotonic `id`) so that only rows that changed since the previous run are transferred. + +## Usage + +```ts +import { ReplicatorPlugin } from '../plugins/replicator' + +const replicatorPlugin = new ReplicatorPlugin({ + external: { + dialect: 'postgresql', + host: env.EXTERNAL_DB_HOST!, + port: env.EXTERNAL_DB_PORT!, + user: env.EXTERNAL_DB_USER!, + password: env.EXTERNAL_DB_PASS!, + database: env.EXTERNAL_DB_DATABASE!, + }, + tables: [ + { + name: 'users', + watermarkColumn: 'updated_at', + primaryKey: 'id', + }, + { + name: 'orders', + watermarkColumn: 'id', + primaryKey: 'id', + destTable: 'orders_mirror', + }, + ], + batchSize: 500, +}) + +const plugins = [ + replicatorPlugin, + // ... other plugins +] satisfies StarbasePlugin[] +``` + +## How To Use + +Trigger a replication pass with an admin-authorized POST request: + +```bash +curl -X POST https:///replicator/sync \ + -H "Authorization: Bearer $ADMIN_AUTHORIZATION_TOKEN" +``` + +Each call returns at most `batchSize` rows per table, so the initial +backfill of a large table will require several invocations until every +table reports `rowsReplicated: 0`. + +### Bootstrapping the destination table + +The replicator does not migrate schemas. Create the destination table on +the StarbaseDB side before the first sync — for example: + +```sql +CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY, + name TEXT, + updated_at TEXT NOT NULL +); +``` + +The primary key column you pass to the plugin must be the `PRIMARY KEY` +(or have a `UNIQUE` index) so that `ON CONFLICT(...) DO UPDATE` works. + +### Scheduling with the Cron plugin + +Pair the replicator with the [Cron plugin](/plugins/cron/README.md) so +that `sync()` runs on a schedule: + +```ts +import { CronPlugin } from '../plugins/cron' +import { ReplicatorPlugin } from '../plugins/replicator' + +const replicatorPlugin = new ReplicatorPlugin({ + /* ...as above... */ +}) + +const cronPlugin = new CronPlugin() +cronPlugin.onEvent(async ({ name }) => { + if (name === 'Replicate every minute') { + await replicatorPlugin.sync() + } +}, ctx) + +const plugins = [ + cronPlugin, + replicatorPlugin, +] satisfies StarbasePlugin[] +``` + +Then insert a matching task row into `tmp_cron_tasks` (see the Cron +plugin README for the schema). + +## Configuration Options + +| Option | Type | Default | Description | +| ------------ | ----------------------- | ------------- | ------------------------------------------------------------------------------------------------------ | +| `external` | `ExternalDatabaseSource` | required | Connection details for the external database to replicate from. | +| `tables` | `ReplicationTable[]` | required | List of tables to replicate. See below. | +| `batchSize` | `number` | `1000` | Maximum number of rows to pull per table per `sync()` call. | +| `pathPrefix` | `string` | `/replicator` | URL prefix for the plugin's HTTP routes. | + +### `ReplicationTable` + +| Field | Type | Default | Description | +| ----------------- | -------- | ---------- | ---------------------------------------------------------------------------------------- | +| `name` | `string` | required | The table name in the external source. | +| `watermarkColumn` | `string` | required | Column used to track replication progress (e.g. `updated_at`, monotonic `id`). | +| `primaryKey` | `string` | required | Column used for upserting rows on the destination side. | +| `destTable` | `string` | `name` | (optional) The destination table name inside StarbaseDB. Defaults to the source name. | + +## How It Works + +1. On registration the plugin creates `tmp_replication_state(table_name, last_value, last_synced_at)` to track the most recent watermark seen per table. +2. On each `sync()` call the plugin reads the stored watermark, runs `SELECT * FROM
WHERE > ORDER BY ASC LIMIT ` against the external source, and upserts the rows into the internal SQLite store using `ON CONFLICT() DO UPDATE`. +3. After all rows are written, the highest watermark observed becomes the new stored `last_value`. Watermark comparison is numeric when both sides parse as numbers (so `id = 100` correctly ranks above `id = 99`) and lexicographic otherwise (which already handles ISO timestamps such as `updated_at`). + +> [!NOTE] +> The destination table must already exist with the matching schema (including the primary key). The replicator does not create or migrate tables on the StarbaseDB side. + +> [!NOTE] +> Table and column identifiers are validated at construction time and must match `[A-Za-z_][A-Za-z0-9_]*`. Identifiers containing spaces, hyphens, quotes or reserved characters are not supported. diff --git a/plugins/replicator/index.test.ts b/plugins/replicator/index.test.ts new file mode 100644 index 0000000..e5af49c --- /dev/null +++ b/plugins/replicator/index.test.ts @@ -0,0 +1,368 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +vi.mock('../../src/operation', () => ({ + executeSDKQuery: vi.fn(), +})) + +import { ReplicatorPlugin } from './index' +import { executeSDKQuery } from '../../src/operation' +import { DataSource, ExternalDatabaseSource } from '../../src/types' + +const mockExecuteSDKQuery = executeSDKQuery as unknown as ReturnType< + typeof vi.fn +> + +const externalSource: ExternalDatabaseSource = { + dialect: 'postgresql', + host: 'localhost', + port: 5432, + user: 'user', + password: 'pass', + database: 'db', +} + +let dataSource: DataSource +let executeQuery: ReturnType + +function buildDataSource() { + executeQuery = vi.fn().mockResolvedValue([]) + return { + rpc: { executeQuery }, + source: 'internal', + } as unknown as DataSource +} + +beforeEach(() => { + vi.clearAllMocks() + dataSource = buildDataSource() +}) + +describe('ReplicatorPlugin - constructor', () => { + it('throws when no external source is provided', () => { + expect( + () => + new ReplicatorPlugin({ + // @ts-expect-error testing runtime validation + external: undefined, + tables: [ + { + name: 'users', + watermarkColumn: 'updated_at', + primaryKey: 'id', + }, + ], + }) + ).toThrow(/external source is required/) + }) + + it('throws when no tables are provided', () => { + expect( + () => + new ReplicatorPlugin({ + external: externalSource, + tables: [], + }) + ).toThrow(/At least one table/) + }) + + it('throws when a table is missing a required field', () => { + expect( + () => + new ReplicatorPlugin({ + external: externalSource, + tables: [ + // @ts-expect-error testing runtime validation + { name: 'users', watermarkColumn: 'updated_at' }, + ], + }) + ).toThrow(/name, watermarkColumn and primaryKey/) + }) +}) + +describe('ReplicatorPlugin - register()', () => { + it('creates the replication state table on registration', async () => { + const plugin = new ReplicatorPlugin({ + external: externalSource, + tables: [ + { + name: 'users', + watermarkColumn: 'updated_at', + primaryKey: 'id', + }, + ], + }) + + const middlewares: Array<(c: any, next: any) => Promise> = [] + const mockApp = { + use: vi.fn((mw) => middlewares.push(mw)), + post: vi.fn(), + } as any + + await plugin.register(mockApp) + + // Run the registered middleware to trigger init(). + await middlewares[0]( + { + get: (key: string) => + key === 'dataSource' ? dataSource : undefined, + }, + vi.fn() + ) + + expect(executeQuery).toHaveBeenCalledWith({ + sql: expect.stringContaining( + 'CREATE TABLE IF NOT EXISTS tmp_replication_state' + ), + params: [], + }) + }) +}) + +describe('ReplicatorPlugin - sync()', () => { + it('pulls rows from external and upserts them into internal storage', async () => { + const plugin = new ReplicatorPlugin({ + external: externalSource, + tables: [ + { + name: 'users', + watermarkColumn: 'updated_at', + primaryKey: 'id', + }, + ], + }) + + executeQuery.mockImplementation(async ({ sql }: { sql: string }) => { + if (sql.includes('SELECT last_value')) return [] + return [] + }) + + mockExecuteSDKQuery.mockResolvedValueOnce([ + { id: 1, name: 'Alice', updated_at: '2024-01-01T00:00:00Z' }, + { id: 2, name: 'Bob', updated_at: '2024-01-02T00:00:00Z' }, + ]) + + ;(plugin as any).dataSource = dataSource + + const results = await plugin.sync() + + expect(mockExecuteSDKQuery).toHaveBeenCalledTimes(1) + const sdkCallArgs = mockExecuteSDKQuery.mock.calls[0][0] + expect(sdkCallArgs.sql).toContain('SELECT * FROM "users"') + expect(sdkCallArgs.sql).toContain('ORDER BY "updated_at" ASC') + // No prior watermark — should not include a WHERE clause. + expect(sdkCallArgs.sql).not.toContain('WHERE') + + const upsertCalls = executeQuery.mock.calls.filter((c: any[]) => + c[0].sql.includes('INSERT INTO "users"') + ) + expect(upsertCalls).toHaveLength(2) + expect(upsertCalls[0][0].sql).toContain( + 'ON CONFLICT("id") DO UPDATE SET' + ) + + const stateUpdates = executeQuery.mock.calls.filter((c: any[]) => + c[0].sql.includes('INSERT INTO tmp_replication_state') + ) + expect(stateUpdates).toHaveLength(1) + expect(stateUpdates[0][0].params[0]).toBe('users') + expect(stateUpdates[0][0].params[1]).toBe('2024-01-02T00:00:00Z') + + expect(results).toEqual([ + { + table: 'users', + rowsReplicated: 2, + lastValue: '2024-01-02T00:00:00Z', + }, + ]) + }) + + it('uses the stored watermark to fetch only newer rows', async () => { + const plugin = new ReplicatorPlugin({ + external: externalSource, + tables: [ + { + name: 'users', + watermarkColumn: 'updated_at', + primaryKey: 'id', + }, + ], + }) + + executeQuery.mockImplementation(async ({ sql }: { sql: string }) => { + if (sql.includes('SELECT last_value')) { + return [{ last_value: '2024-01-02T00:00:00Z' }] + } + return [] + }) + + mockExecuteSDKQuery.mockResolvedValueOnce([]) + ;(plugin as any).dataSource = dataSource + + const results = await plugin.sync() + + const sdkCallArgs = mockExecuteSDKQuery.mock.calls[0][0] + expect(sdkCallArgs.sql).toContain('WHERE "updated_at" > ?') + expect(sdkCallArgs.params).toEqual(['2024-01-02T00:00:00Z']) + + expect(results).toEqual([ + { + table: 'users', + rowsReplicated: 0, + lastValue: '2024-01-02T00:00:00Z', + }, + ]) + + // No new rows were returned, so the state table should not be updated. + const stateUpdates = executeQuery.mock.calls.filter((c: any[]) => + c[0].sql.includes('INSERT INTO tmp_replication_state') + ) + expect(stateUpdates).toHaveLength(0) + }) + + it('writes rows to the configured destination table', async () => { + const plugin = new ReplicatorPlugin({ + external: externalSource, + tables: [ + { + name: 'orders', + watermarkColumn: 'id', + primaryKey: 'id', + destTable: 'orders_mirror', + }, + ], + }) + + executeQuery.mockImplementation(async ({ sql }: { sql: string }) => { + if (sql.includes('SELECT last_value')) return [] + return [] + }) + + mockExecuteSDKQuery.mockResolvedValueOnce([{ id: 10, total: 99.5 }]) + ;(plugin as any).dataSource = dataSource + + await plugin.sync() + + const upsertCall = executeQuery.mock.calls.find((c: any[]) => + c[0].sql.startsWith('INSERT INTO "orders_mirror"') + ) + expect(upsertCall).toBeDefined() + }) + + it('throws when the plugin has not been initialized', async () => { + const plugin = new ReplicatorPlugin({ + external: externalSource, + tables: [ + { + name: 'users', + watermarkColumn: 'updated_at', + primaryKey: 'id', + }, + ], + }) + + await expect(plugin.sync()).rejects.toThrow(/not properly initialized/) + }) + + it('tracks numeric watermarks numerically (not lexicographically)', async () => { + const plugin = new ReplicatorPlugin({ + external: externalSource, + tables: [ + { + name: 'orders', + watermarkColumn: 'id', + primaryKey: 'id', + }, + ], + }) + + executeQuery.mockImplementation(async ({ sql }: { sql: string }) => { + if (sql.includes('SELECT last_value')) return [] + return [] + }) + + // Rows arrive in ASC order. A naive string compare would say + // "9" > "10" > "100", leaving 9 as the watermark and re-pulling rows + // forever. The numeric path should pick 100. + mockExecuteSDKQuery.mockResolvedValueOnce([ + { id: 9 }, + { id: 10 }, + { id: 100 }, + ]) + + ;(plugin as any).dataSource = dataSource + + const results = await plugin.sync() + + expect(results[0].lastValue).toBe('100') + const stateUpdates = executeQuery.mock.calls.filter((c: any[]) => + c[0].sql.includes('INSERT INTO tmp_replication_state') + ) + expect(stateUpdates).toHaveLength(1) + expect(stateUpdates[0][0].params[1]).toBe('100') + }) + + it('uses backtick quoting for the mysql dialect', async () => { + const plugin = new ReplicatorPlugin({ + external: { + dialect: 'mysql', + host: 'localhost', + port: 3306, + user: 'u', + password: 'p', + database: 'd', + }, + tables: [ + { + name: 'users', + watermarkColumn: 'updated_at', + primaryKey: 'id', + }, + ], + }) + + executeQuery.mockImplementation(async ({ sql }: { sql: string }) => { + if (sql.includes('SELECT last_value')) return [] + return [] + }) + mockExecuteSDKQuery.mockResolvedValueOnce([]) + ;(plugin as any).dataSource = dataSource + + await plugin.sync() + + const sdkCallArgs = mockExecuteSDKQuery.mock.calls[0][0] + expect(sdkCallArgs.sql).toContain('SELECT * FROM `users`') + expect(sdkCallArgs.sql).toContain('ORDER BY `updated_at` ASC') + }) +}) + +describe('ReplicatorPlugin - identifier validation', () => { + it('rejects unsafe identifiers in the constructor', () => { + expect( + () => + new ReplicatorPlugin({ + external: externalSource, + tables: [ + { + name: 'users; DROP TABLE foo', + watermarkColumn: 'updated_at', + primaryKey: 'id', + }, + ], + }) + ).toThrow(/Invalid table name/) + + expect( + () => + new ReplicatorPlugin({ + external: externalSource, + tables: [ + { + name: 'users', + watermarkColumn: 'updated at', + primaryKey: 'id', + }, + ], + }) + ).toThrow(/Invalid watermarkColumn/) + }) +}) diff --git a/plugins/replicator/index.ts b/plugins/replicator/index.ts new file mode 100644 index 0000000..8815372 --- /dev/null +++ b/plugins/replicator/index.ts @@ -0,0 +1,325 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { executeSDKQuery } from '../../src/operation' +import { DataSource, ExternalDatabaseSource } from '../../src/types' +import { createResponse } from '../../src/utils' + +// Conservative identifier guard: bare letters, digits and underscores only. +// Anything else (spaces, hyphens, quotes, reserved characters) is rejected at +// construction time so we never have to inject untrusted identifiers into SQL. +const IDENTIFIER_PATTERN = /^[A-Za-z_][A-Za-z0-9_]*$/ + +function validateIdentifier(value: string, label: string) { + if (!IDENTIFIER_PATTERN.test(value)) { + throw new Error( + `Invalid ${label} "${value}". Only [A-Za-z0-9_] identifiers starting with a letter or underscore are supported.` + ) + } +} + +function quoteExternalIdentifier( + name: string, + dialect: ExternalDatabaseSource['dialect'] +): string { + // MySQL uses backticks; postgresql / sqlite use double quotes. Identifiers + // are validated at construction time so the value here is always safe. + return dialect === 'mysql' ? `\`${name}\`` : `"${name}"` +} + +const SQL_QUERIES = { + CREATE_STATE_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_replication_state ( + table_name TEXT NOT NULL PRIMARY KEY, + last_value TEXT, + last_synced_at DATETIME + ) + `, + GET_LAST_VALUE: ` + SELECT last_value FROM tmp_replication_state WHERE table_name = ? + `, + UPSERT_STATE: ` + INSERT INTO tmp_replication_state (table_name, last_value, last_synced_at) + VALUES (?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(table_name) DO UPDATE SET + last_value = excluded.last_value, + last_synced_at = CURRENT_TIMESTAMP + `, + GET_ALL_STATE: ` + SELECT table_name, last_value, last_synced_at FROM tmp_replication_state + `, +} + +export interface ReplicationTable { + // Name of the table in the external source + name: string + // Column used to track replication progress (e.g. updated_at, id) + watermarkColumn: string + // Primary key column used to upsert rows into the internal table + primaryKey: string + // Optional name for the destination table inside StarbaseDB + destTable?: string +} + +export interface ReplicationResult { + table: string + rowsReplicated: number + lastValue: string | null +} + +export class ReplicatorPlugin extends StarbasePlugin { + public pathPrefix: string = '/replicator' + private dataSource?: DataSource + private external: ExternalDatabaseSource + private tables: ReplicationTable[] + private batchSize: number + private config?: StarbaseDBConfiguration + + constructor(opts: { + external: ExternalDatabaseSource + tables: ReplicationTable[] + batchSize?: number + pathPrefix?: string + }) { + super('starbasedb:replicator', { + requiresAuth: true, + }) + + if (!opts?.external) { + throw new Error( + 'An external source is required for the Replicator plugin.' + ) + } + + if (!opts?.tables?.length) { + throw new Error( + 'At least one table must be configured for the Replicator plugin.' + ) + } + + for (const table of opts.tables) { + if (!table.name || !table.watermarkColumn || !table.primaryKey) { + throw new Error( + 'Each replication table requires name, watermarkColumn and primaryKey.' + ) + } + validateIdentifier(table.name, 'table name') + validateIdentifier(table.watermarkColumn, 'watermarkColumn') + validateIdentifier(table.primaryKey, 'primaryKey') + if (table.destTable) { + validateIdentifier(table.destTable, 'destTable') + } + } + + this.external = opts.external + this.tables = opts.tables + this.batchSize = opts.batchSize ?? 1000 + if (opts.pathPrefix) this.pathPrefix = opts.pathPrefix + } + + override async register(app: StarbaseApp) { + app.use(async (c, next) => { + this.dataSource = c?.get('dataSource') + this.config = c?.get('config') + await this.init() + await next() + }) + + app.post(`${this.pathPrefix}/sync`, async (c) => { + // Only admin authorized users may trigger replication. + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + try { + const results = await this.sync() + return createResponse( + { success: true, results }, + undefined, + 200 + ) + } catch (error: unknown) { + console.error('Replication error:', error) + const message = + error instanceof Error ? error.message : String(error) + return createResponse( + undefined, + `Replication failed: ${message}`, + 500 + ) + } + }) + } + + private async init() { + if (!this.dataSource) return + + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_STATE_TABLE, + params: [], + }) + } + + /** + * Run a replication pass across every configured table. Each table is + * pulled in order, using the stored watermark to fetch only the rows + * that have changed since the previous run. + */ + public async sync(): Promise { + const dataSource = this.dataSource + if (!dataSource) { + throw new Error('ReplicatorPlugin not properly initialized') + } + + const results: ReplicationResult[] = [] + for (const table of this.tables) { + results.push(await this.syncTable(dataSource, table)) + } + return results + } + + private async getLastValue( + dataSource: DataSource, + tableName: string + ): Promise { + const rows = (await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.GET_LAST_VALUE, + params: [tableName], + })) as Array<{ last_value: string | null }> + + return rows?.[0]?.last_value ?? null + } + + private async setLastValue( + dataSource: DataSource, + tableName: string, + lastValue: string + ) { + await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.UPSERT_STATE, + params: [tableName, lastValue], + }) + } + + private async syncTable( + dataSource: DataSource, + table: ReplicationTable + ): Promise { + const lastValue = await this.getLastValue(dataSource, table.name) + const destTable = table.destTable ?? table.name + const quotedSource = quoteExternalIdentifier( + table.name, + this.external.dialect + ) + const quotedWatermark = quoteExternalIdentifier( + table.watermarkColumn, + this.external.dialect + ) + + const whereClause = + lastValue !== null ? `WHERE ${quotedWatermark} > ?` : '' + const params = lastValue !== null ? [lastValue] : [] + const selectSql = + `SELECT * FROM ${quotedSource} ${whereClause} ORDER BY ${quotedWatermark} ASC LIMIT ${this.batchSize}`.trim() + + const rows = await this.fetchExternal(dataSource, selectSql, params) + + if (!rows || rows.length === 0) { + return { table: table.name, rowsReplicated: 0, lastValue } + } + + let newLastValue: string | null = lastValue + for (const row of rows) { + await this.upsertRow(dataSource, destTable, table.primaryKey, row) + + const watermark = row[table.watermarkColumn] + if (watermark !== undefined && watermark !== null) { + newLastValue = pickHigherWatermark(newLastValue, watermark) + } + } + + if (newLastValue !== null && newLastValue !== lastValue) { + await this.setLastValue(dataSource, table.name, newLastValue) + } + + return { + table: table.name, + rowsReplicated: rows.length, + lastValue: newLastValue, + } + } + + private async fetchExternal( + dataSource: DataSource, + sql: string, + params: unknown[] + ): Promise>> { + const externalDataSource: DataSource = { + ...dataSource, + source: 'external', + external: this.external, + } + + const result = await executeSDKQuery({ + sql, + params, + dataSource: externalDataSource, + config: this.config ?? { role: 'admin' }, + }) + + return Array.isArray(result) ? result : [] + } + + private async upsertRow( + dataSource: DataSource, + table: string, + primaryKey: string, + row: Record + ) { + const columns = Object.keys(row) + if (columns.length === 0) return + + const placeholders = columns.map(() => '?').join(', ') + const columnsList = columns.map((c) => `"${c}"`).join(', ') + const updateAssignments = columns + .filter((c) => c !== primaryKey) + .map((c) => `"${c}" = excluded."${c}"`) + .join(', ') + + let sql = `INSERT INTO "${table}" (${columnsList}) VALUES (${placeholders})` + if (updateAssignments) { + sql += ` ON CONFLICT("${primaryKey}") DO UPDATE SET ${updateAssignments}` + } + + await dataSource.rpc.executeQuery({ + sql, + params: columns.map((c) => row[c]), + }) + } +} + +/** + * Compare two watermark values and return whichever is "higher". When both + * sides parse as numbers we compare numerically so id=99 < id=100 (a plain + * lexicographic compare would say "99" > "100"). Otherwise we fall back to + * string compare, which already handles ISO timestamps correctly. + */ +function pickHigherWatermark( + current: string | null, + candidate: unknown +): string { + const candidateStr = String(candidate) + if (current === null) return candidateStr + + const currentNum = Number(current) + const candidateNum = Number(candidateStr) + const bothNumeric = + current.trim() !== '' && + candidateStr.trim() !== '' && + Number.isFinite(currentNum) && + Number.isFinite(candidateNum) + + if (bothNumeric) { + return candidateNum > currentNum ? candidateStr : current + } + return candidateStr > current ? candidateStr : current +} diff --git a/plugins/replicator/meta.json b/plugins/replicator/meta.json new file mode 100644 index 0000000..0e4b5e7 --- /dev/null +++ b/plugins/replicator/meta.json @@ -0,0 +1,19 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "tmp_replication_state": [ + "table_name", + "last_value", + "last_synced_at" + ] + }, + "secrets": {}, + "variables": {} + }, + "dependencies": { + "tables": {}, + "secrets": {}, + "variables": {} + } +}