diff --git a/.bounty_pr.json b/.bounty_pr.json
new file mode 100644
index 0000000..16c5fc6
--- /dev/null
+++ b/.bounty_pr.json
@@ -0,0 +1,18 @@
+{
+ "status": "ready",
+ "commit_message": "feat(plugins): add Replicator plugin to mirror external data to internal store",
+ "pr_title": "feat(plugins): add Replicator plugin to mirror external data",
+ "pr_body": "## Purpose\n\nCloses #72.\n\nAdds a new `ReplicatorPlugin` under `plugins/replicator/` that pulls rows from an external database (Postgres, MySQL, Cloudflare D1, Turso, or another StarbaseDB) into the StarbaseDB internal SQLite store. Each replication pass is driven by a per-table watermark column (e.g. `updated_at` or a monotonic `id`) so only rows that changed since the previous run are transferred.\n\n## What changed\n\n- `plugins/replicator/index.ts` — new `ReplicatorPlugin` extending `StarbasePlugin`. Creates `tmp_replication_state(table_name, last_value, last_synced_at)` on registration, exposes `sync()` and `POST /replicator/sync` (admin-only), and upserts rows via `INSERT ... ON CONFLICT(primaryKey) DO UPDATE`.\n- Watermark tracking compares **numerically** when both sides parse as numbers, so a monotonic integer `id` column no longer falls into the lexicographic trap (e.g. `\"99\" > \"100\"`). String compare is used otherwise, which still handles ISO timestamps correctly.\n- Identifiers (`name`, `watermarkColumn`, `primaryKey`, `destTable`) are validated against `[A-Za-z_][A-Za-z0-9_]*` at construction time and quoted in the external `SELECT` using dialect-appropriate quoting (backticks for MySQL, double quotes elsewhere) — consistent with the existing double-quoted destination-side identifiers.\n- `plugins/replicator/index.test.ts` — vitest suite covering constructor validation, identifier validation, state-table creation, initial pull, watermark-bounded pulls, dest-table override, MySQL dialect quoting, and numeric-watermark ordering.\n- `plugins/replicator/README.md` — usage, configuration, Cron-plugin scheduling snippet, destination-table DDL template, and a note that large backfills require repeated `sync()` calls because each call is bounded by `batchSize`.\n- `plugins/replicator/meta.json` — registry metadata to match the other plugins.\n\n## How it works\n\n1. On registration the plugin creates `tmp_replication_state(table_name, last_value, last_synced_at)`.\n2. Each `sync()` call reads the stored watermark per table, runs `SELECT * FROM \"
\" WHERE \"\" > ? ORDER BY \"\" ASC LIMIT ` against the external source, and upserts each row into the internal store using `ON CONFLICT() DO UPDATE`.\n3. After the batch, the highest watermark seen (numeric or lexicographic depending on the value type) becomes the new stored `last_value`.\n\nScheduling is delegated to the existing [Cron plugin](/plugins/cron/README.md) — the README shows the snippet.\n\n## Tasks\n\n- [x] Implement the plugin\n- [x] Add unit tests\n- [x] Document usage, scheduling, and destination-table bootstrapping\n- [x] Validate identifiers and quote them in the external SELECT\n- [x] Compare numeric watermarks numerically\n\n## Verify\n\n- `npx vitest run plugins/replicator/index.test.ts` — 11/11 passing.\n- The 4 failures in `src/rls/index.test.ts` from a full `npx vitest run` are pre-existing on `main` and unrelated to this change (confirmed by running the RLS suite on `main`).\n\n## Before\n\n- [x] Branch contains exactly one commit, scoped to `plugins/replicator/*`.\n- [x] No edits to unrelated files.\n",
+ "branch": "fix/issue-72-starbasedb-replicate-data-from-external",
+ "tests_run": [
+ "npx vitest run plugins/replicator/index.test.ts"
+ ],
+ "tests_passed": true,
+ "files_changed": [
+ "plugins/replicator/README.md",
+ "plugins/replicator/index.test.ts",
+ "plugins/replicator/index.ts",
+ "plugins/replicator/meta.json"
+ ],
+ "notes": "Branch rebased onto main: dropped the 2 unrelated export/dump commits (0034b65, 34b3927) that were on the prior draft, leaving a single replicator commit. Pre-existing failures in src/rls/index.test.ts (4 tests) exist on main and are unrelated to this change."
+}
diff --git a/plugins/replicator/README.md b/plugins/replicator/README.md
new file mode 100644
index 0000000..e7dbb79
--- /dev/null
+++ b/plugins/replicator/README.md
@@ -0,0 +1,127 @@
+# Replicator Plugin
+
+The Replicator Plugin pulls rows from an external database (Postgres, MySQL, Cloudflare D1, Turso, or another StarbaseDB) into StarbaseDB's internal SQLite store on demand. Each replication pass uses a per-table watermark column (e.g. `updated_at` or a monotonic `id`) so that only rows that changed since the previous run are transferred.
+
+## Usage
+
+```ts
+import { ReplicatorPlugin } from '../plugins/replicator'
+
+const replicatorPlugin = new ReplicatorPlugin({
+ external: {
+ dialect: 'postgresql',
+ host: env.EXTERNAL_DB_HOST!,
+ port: env.EXTERNAL_DB_PORT!,
+ user: env.EXTERNAL_DB_USER!,
+ password: env.EXTERNAL_DB_PASS!,
+ database: env.EXTERNAL_DB_DATABASE!,
+ },
+ tables: [
+ {
+ name: 'users',
+ watermarkColumn: 'updated_at',
+ primaryKey: 'id',
+ },
+ {
+ name: 'orders',
+ watermarkColumn: 'id',
+ primaryKey: 'id',
+ destTable: 'orders_mirror',
+ },
+ ],
+ batchSize: 500,
+})
+
+const plugins = [
+ replicatorPlugin,
+ // ... other plugins
+] satisfies StarbasePlugin[]
+```
+
+## How To Use
+
+Trigger a replication pass with an admin-authorized POST request:
+
+```bash
+curl -X POST https:///replicator/sync \
+ -H "Authorization: Bearer $ADMIN_AUTHORIZATION_TOKEN"
+```
+
+Each call returns at most `batchSize` rows per table, so the initial
+backfill of a large table will require several invocations until every
+table reports `rowsReplicated: 0`.
+
+### Bootstrapping the destination table
+
+The replicator does not migrate schemas. Create the destination table on
+the StarbaseDB side before the first sync — for example:
+
+```sql
+CREATE TABLE IF NOT EXISTS users (
+ id INTEGER PRIMARY KEY,
+ name TEXT,
+ updated_at TEXT NOT NULL
+);
+```
+
+The primary key column you pass to the plugin must be the `PRIMARY KEY`
+(or have a `UNIQUE` index) so that `ON CONFLICT(...) DO UPDATE` works.
+
+### Scheduling with the Cron plugin
+
+Pair the replicator with the [Cron plugin](/plugins/cron/README.md) so
+that `sync()` runs on a schedule:
+
+```ts
+import { CronPlugin } from '../plugins/cron'
+import { ReplicatorPlugin } from '../plugins/replicator'
+
+const replicatorPlugin = new ReplicatorPlugin({
+ /* ...as above... */
+})
+
+const cronPlugin = new CronPlugin()
+cronPlugin.onEvent(async ({ name }) => {
+ if (name === 'Replicate every minute') {
+ await replicatorPlugin.sync()
+ }
+}, ctx)
+
+const plugins = [
+ cronPlugin,
+ replicatorPlugin,
+] satisfies StarbasePlugin[]
+```
+
+Then insert a matching task row into `tmp_cron_tasks` (see the Cron
+plugin README for the schema).
+
+## Configuration Options
+
+| Option | Type | Default | Description |
+| ------------ | ----------------------- | ------------- | ------------------------------------------------------------------------------------------------------ |
+| `external` | `ExternalDatabaseSource` | required | Connection details for the external database to replicate from. |
+| `tables` | `ReplicationTable[]` | required | List of tables to replicate. See below. |
+| `batchSize` | `number` | `1000` | Maximum number of rows to pull per table per `sync()` call. |
+| `pathPrefix` | `string` | `/replicator` | URL prefix for the plugin's HTTP routes. |
+
+### `ReplicationTable`
+
+| Field | Type | Default | Description |
+| ----------------- | -------- | ---------- | ---------------------------------------------------------------------------------------- |
+| `name` | `string` | required | The table name in the external source. |
+| `watermarkColumn` | `string` | required | Column used to track replication progress (e.g. `updated_at`, monotonic `id`). |
+| `primaryKey` | `string` | required | Column used for upserting rows on the destination side. |
+| `destTable` | `string` | `name` | (optional) The destination table name inside StarbaseDB. Defaults to the source name. |
+
+## How It Works
+
+1. On registration the plugin creates `tmp_replication_state(table_name, last_value, last_synced_at)` to track the most recent watermark seen per table.
+2. On each `sync()` call the plugin reads the stored watermark, runs `SELECT * FROM WHERE > ORDER BY ASC LIMIT ` against the external source, and upserts the rows into the internal SQLite store using `ON CONFLICT() DO UPDATE`.
+3. After all rows are written, the highest watermark observed becomes the new stored `last_value`. Watermark comparison is numeric when both sides parse as numbers (so `id = 100` correctly ranks above `id = 99`) and lexicographic otherwise (which already handles ISO timestamps such as `updated_at`).
+
+> [!NOTE]
+> The destination table must already exist with the matching schema (including the primary key). The replicator does not create or migrate tables on the StarbaseDB side.
+
+> [!NOTE]
+> Table and column identifiers are validated at construction time and must match `[A-Za-z_][A-Za-z0-9_]*`. Identifiers containing spaces, hyphens, quotes or reserved characters are not supported.
diff --git a/plugins/replicator/index.test.ts b/plugins/replicator/index.test.ts
new file mode 100644
index 0000000..e5af49c
--- /dev/null
+++ b/plugins/replicator/index.test.ts
@@ -0,0 +1,368 @@
+import { describe, it, expect, vi, beforeEach } from 'vitest'
+
+vi.mock('../../src/operation', () => ({
+ executeSDKQuery: vi.fn(),
+}))
+
+import { ReplicatorPlugin } from './index'
+import { executeSDKQuery } from '../../src/operation'
+import { DataSource, ExternalDatabaseSource } from '../../src/types'
+
+const mockExecuteSDKQuery = executeSDKQuery as unknown as ReturnType<
+ typeof vi.fn
+>
+
+const externalSource: ExternalDatabaseSource = {
+ dialect: 'postgresql',
+ host: 'localhost',
+ port: 5432,
+ user: 'user',
+ password: 'pass',
+ database: 'db',
+}
+
+let dataSource: DataSource
+let executeQuery: ReturnType
+
+function buildDataSource() {
+ executeQuery = vi.fn().mockResolvedValue([])
+ return {
+ rpc: { executeQuery },
+ source: 'internal',
+ } as unknown as DataSource
+}
+
+beforeEach(() => {
+ vi.clearAllMocks()
+ dataSource = buildDataSource()
+})
+
+describe('ReplicatorPlugin - constructor', () => {
+ it('throws when no external source is provided', () => {
+ expect(
+ () =>
+ new ReplicatorPlugin({
+ // @ts-expect-error testing runtime validation
+ external: undefined,
+ tables: [
+ {
+ name: 'users',
+ watermarkColumn: 'updated_at',
+ primaryKey: 'id',
+ },
+ ],
+ })
+ ).toThrow(/external source is required/)
+ })
+
+ it('throws when no tables are provided', () => {
+ expect(
+ () =>
+ new ReplicatorPlugin({
+ external: externalSource,
+ tables: [],
+ })
+ ).toThrow(/At least one table/)
+ })
+
+ it('throws when a table is missing a required field', () => {
+ expect(
+ () =>
+ new ReplicatorPlugin({
+ external: externalSource,
+ tables: [
+ // @ts-expect-error testing runtime validation
+ { name: 'users', watermarkColumn: 'updated_at' },
+ ],
+ })
+ ).toThrow(/name, watermarkColumn and primaryKey/)
+ })
+})
+
+describe('ReplicatorPlugin - register()', () => {
+ it('creates the replication state table on registration', async () => {
+ const plugin = new ReplicatorPlugin({
+ external: externalSource,
+ tables: [
+ {
+ name: 'users',
+ watermarkColumn: 'updated_at',
+ primaryKey: 'id',
+ },
+ ],
+ })
+
+ const middlewares: Array<(c: any, next: any) => Promise> = []
+ const mockApp = {
+ use: vi.fn((mw) => middlewares.push(mw)),
+ post: vi.fn(),
+ } as any
+
+ await plugin.register(mockApp)
+
+ // Run the registered middleware to trigger init().
+ await middlewares[0](
+ {
+ get: (key: string) =>
+ key === 'dataSource' ? dataSource : undefined,
+ },
+ vi.fn()
+ )
+
+ expect(executeQuery).toHaveBeenCalledWith({
+ sql: expect.stringContaining(
+ 'CREATE TABLE IF NOT EXISTS tmp_replication_state'
+ ),
+ params: [],
+ })
+ })
+})
+
+describe('ReplicatorPlugin - sync()', () => {
+ it('pulls rows from external and upserts them into internal storage', async () => {
+ const plugin = new ReplicatorPlugin({
+ external: externalSource,
+ tables: [
+ {
+ name: 'users',
+ watermarkColumn: 'updated_at',
+ primaryKey: 'id',
+ },
+ ],
+ })
+
+ executeQuery.mockImplementation(async ({ sql }: { sql: string }) => {
+ if (sql.includes('SELECT last_value')) return []
+ return []
+ })
+
+ mockExecuteSDKQuery.mockResolvedValueOnce([
+ { id: 1, name: 'Alice', updated_at: '2024-01-01T00:00:00Z' },
+ { id: 2, name: 'Bob', updated_at: '2024-01-02T00:00:00Z' },
+ ])
+
+ ;(plugin as any).dataSource = dataSource
+
+ const results = await plugin.sync()
+
+ expect(mockExecuteSDKQuery).toHaveBeenCalledTimes(1)
+ const sdkCallArgs = mockExecuteSDKQuery.mock.calls[0][0]
+ expect(sdkCallArgs.sql).toContain('SELECT * FROM "users"')
+ expect(sdkCallArgs.sql).toContain('ORDER BY "updated_at" ASC')
+ // No prior watermark — should not include a WHERE clause.
+ expect(sdkCallArgs.sql).not.toContain('WHERE')
+
+ const upsertCalls = executeQuery.mock.calls.filter((c: any[]) =>
+ c[0].sql.includes('INSERT INTO "users"')
+ )
+ expect(upsertCalls).toHaveLength(2)
+ expect(upsertCalls[0][0].sql).toContain(
+ 'ON CONFLICT("id") DO UPDATE SET'
+ )
+
+ const stateUpdates = executeQuery.mock.calls.filter((c: any[]) =>
+ c[0].sql.includes('INSERT INTO tmp_replication_state')
+ )
+ expect(stateUpdates).toHaveLength(1)
+ expect(stateUpdates[0][0].params[0]).toBe('users')
+ expect(stateUpdates[0][0].params[1]).toBe('2024-01-02T00:00:00Z')
+
+ expect(results).toEqual([
+ {
+ table: 'users',
+ rowsReplicated: 2,
+ lastValue: '2024-01-02T00:00:00Z',
+ },
+ ])
+ })
+
+ it('uses the stored watermark to fetch only newer rows', async () => {
+ const plugin = new ReplicatorPlugin({
+ external: externalSource,
+ tables: [
+ {
+ name: 'users',
+ watermarkColumn: 'updated_at',
+ primaryKey: 'id',
+ },
+ ],
+ })
+
+ executeQuery.mockImplementation(async ({ sql }: { sql: string }) => {
+ if (sql.includes('SELECT last_value')) {
+ return [{ last_value: '2024-01-02T00:00:00Z' }]
+ }
+ return []
+ })
+
+ mockExecuteSDKQuery.mockResolvedValueOnce([])
+ ;(plugin as any).dataSource = dataSource
+
+ const results = await plugin.sync()
+
+ const sdkCallArgs = mockExecuteSDKQuery.mock.calls[0][0]
+ expect(sdkCallArgs.sql).toContain('WHERE "updated_at" > ?')
+ expect(sdkCallArgs.params).toEqual(['2024-01-02T00:00:00Z'])
+
+ expect(results).toEqual([
+ {
+ table: 'users',
+ rowsReplicated: 0,
+ lastValue: '2024-01-02T00:00:00Z',
+ },
+ ])
+
+ // No new rows were returned, so the state table should not be updated.
+ const stateUpdates = executeQuery.mock.calls.filter((c: any[]) =>
+ c[0].sql.includes('INSERT INTO tmp_replication_state')
+ )
+ expect(stateUpdates).toHaveLength(0)
+ })
+
+ it('writes rows to the configured destination table', async () => {
+ const plugin = new ReplicatorPlugin({
+ external: externalSource,
+ tables: [
+ {
+ name: 'orders',
+ watermarkColumn: 'id',
+ primaryKey: 'id',
+ destTable: 'orders_mirror',
+ },
+ ],
+ })
+
+ executeQuery.mockImplementation(async ({ sql }: { sql: string }) => {
+ if (sql.includes('SELECT last_value')) return []
+ return []
+ })
+
+ mockExecuteSDKQuery.mockResolvedValueOnce([{ id: 10, total: 99.5 }])
+ ;(plugin as any).dataSource = dataSource
+
+ await plugin.sync()
+
+ const upsertCall = executeQuery.mock.calls.find((c: any[]) =>
+ c[0].sql.startsWith('INSERT INTO "orders_mirror"')
+ )
+ expect(upsertCall).toBeDefined()
+ })
+
+ it('throws when the plugin has not been initialized', async () => {
+ const plugin = new ReplicatorPlugin({
+ external: externalSource,
+ tables: [
+ {
+ name: 'users',
+ watermarkColumn: 'updated_at',
+ primaryKey: 'id',
+ },
+ ],
+ })
+
+ await expect(plugin.sync()).rejects.toThrow(/not properly initialized/)
+ })
+
+ it('tracks numeric watermarks numerically (not lexicographically)', async () => {
+ const plugin = new ReplicatorPlugin({
+ external: externalSource,
+ tables: [
+ {
+ name: 'orders',
+ watermarkColumn: 'id',
+ primaryKey: 'id',
+ },
+ ],
+ })
+
+ executeQuery.mockImplementation(async ({ sql }: { sql: string }) => {
+ if (sql.includes('SELECT last_value')) return []
+ return []
+ })
+
+ // Rows arrive in ASC order. A naive string compare would say
+ // "9" > "10" > "100", leaving 9 as the watermark and re-pulling rows
+ // forever. The numeric path should pick 100.
+ mockExecuteSDKQuery.mockResolvedValueOnce([
+ { id: 9 },
+ { id: 10 },
+ { id: 100 },
+ ])
+
+ ;(plugin as any).dataSource = dataSource
+
+ const results = await plugin.sync()
+
+ expect(results[0].lastValue).toBe('100')
+ const stateUpdates = executeQuery.mock.calls.filter((c: any[]) =>
+ c[0].sql.includes('INSERT INTO tmp_replication_state')
+ )
+ expect(stateUpdates).toHaveLength(1)
+ expect(stateUpdates[0][0].params[1]).toBe('100')
+ })
+
+ it('uses backtick quoting for the mysql dialect', async () => {
+ const plugin = new ReplicatorPlugin({
+ external: {
+ dialect: 'mysql',
+ host: 'localhost',
+ port: 3306,
+ user: 'u',
+ password: 'p',
+ database: 'd',
+ },
+ tables: [
+ {
+ name: 'users',
+ watermarkColumn: 'updated_at',
+ primaryKey: 'id',
+ },
+ ],
+ })
+
+ executeQuery.mockImplementation(async ({ sql }: { sql: string }) => {
+ if (sql.includes('SELECT last_value')) return []
+ return []
+ })
+ mockExecuteSDKQuery.mockResolvedValueOnce([])
+ ;(plugin as any).dataSource = dataSource
+
+ await plugin.sync()
+
+ const sdkCallArgs = mockExecuteSDKQuery.mock.calls[0][0]
+ expect(sdkCallArgs.sql).toContain('SELECT * FROM `users`')
+ expect(sdkCallArgs.sql).toContain('ORDER BY `updated_at` ASC')
+ })
+})
+
+describe('ReplicatorPlugin - identifier validation', () => {
+ it('rejects unsafe identifiers in the constructor', () => {
+ expect(
+ () =>
+ new ReplicatorPlugin({
+ external: externalSource,
+ tables: [
+ {
+ name: 'users; DROP TABLE foo',
+ watermarkColumn: 'updated_at',
+ primaryKey: 'id',
+ },
+ ],
+ })
+ ).toThrow(/Invalid table name/)
+
+ expect(
+ () =>
+ new ReplicatorPlugin({
+ external: externalSource,
+ tables: [
+ {
+ name: 'users',
+ watermarkColumn: 'updated at',
+ primaryKey: 'id',
+ },
+ ],
+ })
+ ).toThrow(/Invalid watermarkColumn/)
+ })
+})
diff --git a/plugins/replicator/index.ts b/plugins/replicator/index.ts
new file mode 100644
index 0000000..8815372
--- /dev/null
+++ b/plugins/replicator/index.ts
@@ -0,0 +1,325 @@
+import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler'
+import { StarbasePlugin } from '../../src/plugin'
+import { executeSDKQuery } from '../../src/operation'
+import { DataSource, ExternalDatabaseSource } from '../../src/types'
+import { createResponse } from '../../src/utils'
+
+// Conservative identifier guard: bare letters, digits and underscores only.
+// Anything else (spaces, hyphens, quotes, reserved characters) is rejected at
+// construction time so we never have to inject untrusted identifiers into SQL.
+const IDENTIFIER_PATTERN = /^[A-Za-z_][A-Za-z0-9_]*$/
+
+function validateIdentifier(value: string, label: string) {
+ if (!IDENTIFIER_PATTERN.test(value)) {
+ throw new Error(
+ `Invalid ${label} "${value}". Only [A-Za-z0-9_] identifiers starting with a letter or underscore are supported.`
+ )
+ }
+}
+
+function quoteExternalIdentifier(
+ name: string,
+ dialect: ExternalDatabaseSource['dialect']
+): string {
+ // MySQL uses backticks; postgresql / sqlite use double quotes. Identifiers
+ // are validated at construction time so the value here is always safe.
+ return dialect === 'mysql' ? `\`${name}\`` : `"${name}"`
+}
+
+const SQL_QUERIES = {
+ CREATE_STATE_TABLE: `
+ CREATE TABLE IF NOT EXISTS tmp_replication_state (
+ table_name TEXT NOT NULL PRIMARY KEY,
+ last_value TEXT,
+ last_synced_at DATETIME
+ )
+ `,
+ GET_LAST_VALUE: `
+ SELECT last_value FROM tmp_replication_state WHERE table_name = ?
+ `,
+ UPSERT_STATE: `
+ INSERT INTO tmp_replication_state (table_name, last_value, last_synced_at)
+ VALUES (?, ?, CURRENT_TIMESTAMP)
+ ON CONFLICT(table_name) DO UPDATE SET
+ last_value = excluded.last_value,
+ last_synced_at = CURRENT_TIMESTAMP
+ `,
+ GET_ALL_STATE: `
+ SELECT table_name, last_value, last_synced_at FROM tmp_replication_state
+ `,
+}
+
+export interface ReplicationTable {
+ // Name of the table in the external source
+ name: string
+ // Column used to track replication progress (e.g. updated_at, id)
+ watermarkColumn: string
+ // Primary key column used to upsert rows into the internal table
+ primaryKey: string
+ // Optional name for the destination table inside StarbaseDB
+ destTable?: string
+}
+
+export interface ReplicationResult {
+ table: string
+ rowsReplicated: number
+ lastValue: string | null
+}
+
+export class ReplicatorPlugin extends StarbasePlugin {
+ public pathPrefix: string = '/replicator'
+ private dataSource?: DataSource
+ private external: ExternalDatabaseSource
+ private tables: ReplicationTable[]
+ private batchSize: number
+ private config?: StarbaseDBConfiguration
+
+ constructor(opts: {
+ external: ExternalDatabaseSource
+ tables: ReplicationTable[]
+ batchSize?: number
+ pathPrefix?: string
+ }) {
+ super('starbasedb:replicator', {
+ requiresAuth: true,
+ })
+
+ if (!opts?.external) {
+ throw new Error(
+ 'An external source is required for the Replicator plugin.'
+ )
+ }
+
+ if (!opts?.tables?.length) {
+ throw new Error(
+ 'At least one table must be configured for the Replicator plugin.'
+ )
+ }
+
+ for (const table of opts.tables) {
+ if (!table.name || !table.watermarkColumn || !table.primaryKey) {
+ throw new Error(
+ 'Each replication table requires name, watermarkColumn and primaryKey.'
+ )
+ }
+ validateIdentifier(table.name, 'table name')
+ validateIdentifier(table.watermarkColumn, 'watermarkColumn')
+ validateIdentifier(table.primaryKey, 'primaryKey')
+ if (table.destTable) {
+ validateIdentifier(table.destTable, 'destTable')
+ }
+ }
+
+ this.external = opts.external
+ this.tables = opts.tables
+ this.batchSize = opts.batchSize ?? 1000
+ if (opts.pathPrefix) this.pathPrefix = opts.pathPrefix
+ }
+
+ override async register(app: StarbaseApp) {
+ app.use(async (c, next) => {
+ this.dataSource = c?.get('dataSource')
+ this.config = c?.get('config')
+ await this.init()
+ await next()
+ })
+
+ app.post(`${this.pathPrefix}/sync`, async (c) => {
+ // Only admin authorized users may trigger replication.
+ if (this.config?.role !== 'admin') {
+ return createResponse(undefined, 'Unauthorized request', 401)
+ }
+
+ try {
+ const results = await this.sync()
+ return createResponse(
+ { success: true, results },
+ undefined,
+ 200
+ )
+ } catch (error: unknown) {
+ console.error('Replication error:', error)
+ const message =
+ error instanceof Error ? error.message : String(error)
+ return createResponse(
+ undefined,
+ `Replication failed: ${message}`,
+ 500
+ )
+ }
+ })
+ }
+
+ private async init() {
+ if (!this.dataSource) return
+
+ await this.dataSource.rpc.executeQuery({
+ sql: SQL_QUERIES.CREATE_STATE_TABLE,
+ params: [],
+ })
+ }
+
+ /**
+ * Run a replication pass across every configured table. Each table is
+ * pulled in order, using the stored watermark to fetch only the rows
+ * that have changed since the previous run.
+ */
+ public async sync(): Promise {
+ const dataSource = this.dataSource
+ if (!dataSource) {
+ throw new Error('ReplicatorPlugin not properly initialized')
+ }
+
+ const results: ReplicationResult[] = []
+ for (const table of this.tables) {
+ results.push(await this.syncTable(dataSource, table))
+ }
+ return results
+ }
+
+ private async getLastValue(
+ dataSource: DataSource,
+ tableName: string
+ ): Promise {
+ const rows = (await dataSource.rpc.executeQuery({
+ sql: SQL_QUERIES.GET_LAST_VALUE,
+ params: [tableName],
+ })) as Array<{ last_value: string | null }>
+
+ return rows?.[0]?.last_value ?? null
+ }
+
+ private async setLastValue(
+ dataSource: DataSource,
+ tableName: string,
+ lastValue: string
+ ) {
+ await dataSource.rpc.executeQuery({
+ sql: SQL_QUERIES.UPSERT_STATE,
+ params: [tableName, lastValue],
+ })
+ }
+
+ private async syncTable(
+ dataSource: DataSource,
+ table: ReplicationTable
+ ): Promise {
+ const lastValue = await this.getLastValue(dataSource, table.name)
+ const destTable = table.destTable ?? table.name
+ const quotedSource = quoteExternalIdentifier(
+ table.name,
+ this.external.dialect
+ )
+ const quotedWatermark = quoteExternalIdentifier(
+ table.watermarkColumn,
+ this.external.dialect
+ )
+
+ const whereClause =
+ lastValue !== null ? `WHERE ${quotedWatermark} > ?` : ''
+ const params = lastValue !== null ? [lastValue] : []
+ const selectSql =
+ `SELECT * FROM ${quotedSource} ${whereClause} ORDER BY ${quotedWatermark} ASC LIMIT ${this.batchSize}`.trim()
+
+ const rows = await this.fetchExternal(dataSource, selectSql, params)
+
+ if (!rows || rows.length === 0) {
+ return { table: table.name, rowsReplicated: 0, lastValue }
+ }
+
+ let newLastValue: string | null = lastValue
+ for (const row of rows) {
+ await this.upsertRow(dataSource, destTable, table.primaryKey, row)
+
+ const watermark = row[table.watermarkColumn]
+ if (watermark !== undefined && watermark !== null) {
+ newLastValue = pickHigherWatermark(newLastValue, watermark)
+ }
+ }
+
+ if (newLastValue !== null && newLastValue !== lastValue) {
+ await this.setLastValue(dataSource, table.name, newLastValue)
+ }
+
+ return {
+ table: table.name,
+ rowsReplicated: rows.length,
+ lastValue: newLastValue,
+ }
+ }
+
+ private async fetchExternal(
+ dataSource: DataSource,
+ sql: string,
+ params: unknown[]
+ ): Promise>> {
+ const externalDataSource: DataSource = {
+ ...dataSource,
+ source: 'external',
+ external: this.external,
+ }
+
+ const result = await executeSDKQuery({
+ sql,
+ params,
+ dataSource: externalDataSource,
+ config: this.config ?? { role: 'admin' },
+ })
+
+ return Array.isArray(result) ? result : []
+ }
+
+ private async upsertRow(
+ dataSource: DataSource,
+ table: string,
+ primaryKey: string,
+ row: Record
+ ) {
+ const columns = Object.keys(row)
+ if (columns.length === 0) return
+
+ const placeholders = columns.map(() => '?').join(', ')
+ const columnsList = columns.map((c) => `"${c}"`).join(', ')
+ const updateAssignments = columns
+ .filter((c) => c !== primaryKey)
+ .map((c) => `"${c}" = excluded."${c}"`)
+ .join(', ')
+
+ let sql = `INSERT INTO "${table}" (${columnsList}) VALUES (${placeholders})`
+ if (updateAssignments) {
+ sql += ` ON CONFLICT("${primaryKey}") DO UPDATE SET ${updateAssignments}`
+ }
+
+ await dataSource.rpc.executeQuery({
+ sql,
+ params: columns.map((c) => row[c]),
+ })
+ }
+}
+
+/**
+ * Compare two watermark values and return whichever is "higher". When both
+ * sides parse as numbers we compare numerically so id=99 < id=100 (a plain
+ * lexicographic compare would say "99" > "100"). Otherwise we fall back to
+ * string compare, which already handles ISO timestamps correctly.
+ */
+function pickHigherWatermark(
+ current: string | null,
+ candidate: unknown
+): string {
+ const candidateStr = String(candidate)
+ if (current === null) return candidateStr
+
+ const currentNum = Number(current)
+ const candidateNum = Number(candidateStr)
+ const bothNumeric =
+ current.trim() !== '' &&
+ candidateStr.trim() !== '' &&
+ Number.isFinite(currentNum) &&
+ Number.isFinite(candidateNum)
+
+ if (bothNumeric) {
+ return candidateNum > currentNum ? candidateStr : current
+ }
+ return candidateStr > current ? candidateStr : current
+}
diff --git a/plugins/replicator/meta.json b/plugins/replicator/meta.json
new file mode 100644
index 0000000..0e4b5e7
--- /dev/null
+++ b/plugins/replicator/meta.json
@@ -0,0 +1,19 @@
+{
+ "version": "1.0.0",
+ "resources": {
+ "tables": {
+ "tmp_replication_state": [
+ "table_name",
+ "last_value",
+ "last_synced_at"
+ ]
+ },
+ "secrets": {},
+ "variables": {}
+ },
+ "dependencies": {
+ "tables": {},
+ "secrets": {},
+ "variables": {}
+ }
+}