Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .bounty_pr.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"status": "ready",
"commit_message": "feat(plugins): add Replicator plugin to mirror external data to internal store",
"pr_title": "feat(plugins): add Replicator plugin to mirror external data",
"pr_body": "## Purpose\n\nCloses #72.\n\nAdds a new `ReplicatorPlugin` under `plugins/replicator/` that pulls rows from an external database (Postgres, MySQL, Cloudflare D1, Turso, or another StarbaseDB) into the StarbaseDB internal SQLite store. Each replication pass is driven by a per-table watermark column (e.g. `updated_at` or a monotonic `id`) so only rows that changed since the previous run are transferred.\n\n## What changed\n\n- `plugins/replicator/index.ts` β€” new `ReplicatorPlugin` extending `StarbasePlugin`. Creates `tmp_replication_state(table_name, last_value, last_synced_at)` on registration, exposes `sync()` and `POST /replicator/sync` (admin-only), and upserts rows via `INSERT ... ON CONFLICT(primaryKey) DO UPDATE`.\n- Watermark tracking compares **numerically** when both sides parse as numbers, so a monotonic integer `id` column no longer falls into the lexicographic trap (e.g. `\"99\" > \"100\"`). String compare is used otherwise, which still handles ISO timestamps correctly.\n- Identifiers (`name`, `watermarkColumn`, `primaryKey`, `destTable`) are validated against `[A-Za-z_][A-Za-z0-9_]*` at construction time and quoted in the external `SELECT` using dialect-appropriate quoting (backticks for MySQL, double quotes elsewhere) β€” consistent with the existing double-quoted destination-side identifiers.\n- `plugins/replicator/index.test.ts` β€” vitest suite covering constructor validation, identifier validation, state-table creation, initial pull, watermark-bounded pulls, dest-table override, MySQL dialect quoting, and numeric-watermark ordering.\n- `plugins/replicator/README.md` β€” usage, configuration, Cron-plugin scheduling snippet, destination-table DDL template, and a note that large backfills require repeated `sync()` calls because each call is bounded by `batchSize`.\n- `plugins/replicator/meta.json` β€” registry metadata to match the other plugins.\n\n## How it works\n\n1. On registration the plugin creates `tmp_replication_state(table_name, last_value, last_synced_at)`.\n2. Each `sync()` call reads the stored watermark per table, runs `SELECT * FROM \"<table>\" WHERE \"<watermarkColumn>\" > ? ORDER BY \"<watermarkColumn>\" ASC LIMIT <batchSize>` against the external source, and upserts each row into the internal store using `ON CONFLICT(<primaryKey>) DO UPDATE`.\n3. After the batch, the highest watermark seen (numeric or lexicographic depending on the value type) becomes the new stored `last_value`.\n\nScheduling is delegated to the existing [Cron plugin](/plugins/cron/README.md) β€” the README shows the snippet.\n\n## Tasks\n\n- [x] Implement the plugin\n- [x] Add unit tests\n- [x] Document usage, scheduling, and destination-table bootstrapping\n- [x] Validate identifiers and quote them in the external SELECT\n- [x] Compare numeric watermarks numerically\n\n## Verify\n\n- `npx vitest run plugins/replicator/index.test.ts` β€” 11/11 passing.\n- The 4 failures in `src/rls/index.test.ts` from a full `npx vitest run` are pre-existing on `main` and unrelated to this change (confirmed by running the RLS suite on `main`).\n\n## Before\n\n- [x] Branch contains exactly one commit, scoped to `plugins/replicator/*`.\n- [x] No edits to unrelated files.\n",
"branch": "fix/issue-72-starbasedb-replicate-data-from-external",
"tests_run": [
"npx vitest run plugins/replicator/index.test.ts"
],
"tests_passed": true,
"files_changed": [
"plugins/replicator/README.md",
"plugins/replicator/index.test.ts",
"plugins/replicator/index.ts",
"plugins/replicator/meta.json"
],
"notes": "Branch rebased onto main: dropped the 2 unrelated export/dump commits (0034b65, 34b3927) that were on the prior draft, leaving a single replicator commit. Pre-existing failures in src/rls/index.test.ts (4 tests) exist on main and are unrelated to this change."
}
127 changes: 127 additions & 0 deletions plugins/replicator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Replicator Plugin

The Replicator Plugin pulls rows from an external database (Postgres, MySQL, Cloudflare D1, Turso, or another StarbaseDB) into StarbaseDB's internal SQLite store on demand. Each replication pass uses a per-table watermark column (e.g. `updated_at` or a monotonic `id`) so that only rows that changed since the previous run are transferred.

## Usage

```ts
import { ReplicatorPlugin } from '../plugins/replicator'

const replicatorPlugin = new ReplicatorPlugin({
external: {
dialect: 'postgresql',
host: env.EXTERNAL_DB_HOST!,
port: env.EXTERNAL_DB_PORT!,
user: env.EXTERNAL_DB_USER!,
password: env.EXTERNAL_DB_PASS!,
database: env.EXTERNAL_DB_DATABASE!,
},
tables: [
{
name: 'users',
watermarkColumn: 'updated_at',
primaryKey: 'id',
},
{
name: 'orders',
watermarkColumn: 'id',
primaryKey: 'id',
destTable: 'orders_mirror',
},
],
batchSize: 500,
})

const plugins = [
replicatorPlugin,
// ... other plugins
] satisfies StarbasePlugin[]
```

## How To Use

Trigger a replication pass with an admin-authorized POST request:

```bash
curl -X POST https://<your-starbase-instance>/replicator/sync \
-H "Authorization: Bearer $ADMIN_AUTHORIZATION_TOKEN"
```

Each call returns at most `batchSize` rows per table, so the initial
backfill of a large table will require several invocations until every
table reports `rowsReplicated: 0`.

### Bootstrapping the destination table

The replicator does not migrate schemas. Create the destination table on
the StarbaseDB side before the first sync β€” for example:

```sql
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
updated_at TEXT NOT NULL
);
```

The primary key column you pass to the plugin must be the `PRIMARY KEY`
(or have a `UNIQUE` index) so that `ON CONFLICT(...) DO UPDATE` works.

### Scheduling with the Cron plugin

Pair the replicator with the [Cron plugin](/plugins/cron/README.md) so
that `sync()` runs on a schedule:

```ts
import { CronPlugin } from '../plugins/cron'
import { ReplicatorPlugin } from '../plugins/replicator'

const replicatorPlugin = new ReplicatorPlugin({
/* ...as above... */
})

const cronPlugin = new CronPlugin()
cronPlugin.onEvent(async ({ name }) => {
if (name === 'Replicate every minute') {
await replicatorPlugin.sync()
}
}, ctx)

const plugins = [
cronPlugin,
replicatorPlugin,
] satisfies StarbasePlugin[]
```

Then insert a matching task row into `tmp_cron_tasks` (see the Cron
plugin README for the schema).

## Configuration Options

| Option | Type | Default | Description |
| ------------ | ----------------------- | ------------- | ------------------------------------------------------------------------------------------------------ |
| `external` | `ExternalDatabaseSource` | required | Connection details for the external database to replicate from. |
| `tables` | `ReplicationTable[]` | required | List of tables to replicate. See below. |
| `batchSize` | `number` | `1000` | Maximum number of rows to pull per table per `sync()` call. |
| `pathPrefix` | `string` | `/replicator` | URL prefix for the plugin's HTTP routes. |

### `ReplicationTable`

| Field | Type | Default | Description |
| ----------------- | -------- | ---------- | ---------------------------------------------------------------------------------------- |
| `name` | `string` | required | The table name in the external source. |
| `watermarkColumn` | `string` | required | Column used to track replication progress (e.g. `updated_at`, monotonic `id`). |
| `primaryKey` | `string` | required | Column used for upserting rows on the destination side. |
| `destTable` | `string` | `name` | (optional) The destination table name inside StarbaseDB. Defaults to the source name. |

## How It Works

1. On registration the plugin creates `tmp_replication_state(table_name, last_value, last_synced_at)` to track the most recent watermark seen per table.
2. On each `sync()` call the plugin reads the stored watermark, runs `SELECT * FROM <table> WHERE <watermarkColumn> > <last_value> ORDER BY <watermarkColumn> ASC LIMIT <batchSize>` against the external source, and upserts the rows into the internal SQLite store using `ON CONFLICT(<primaryKey>) DO UPDATE`.
3. After all rows are written, the highest watermark observed becomes the new stored `last_value`. Watermark comparison is numeric when both sides parse as numbers (so `id = 100` correctly ranks above `id = 99`) and lexicographic otherwise (which already handles ISO timestamps such as `updated_at`).

> [!NOTE]
> The destination table must already exist with the matching schema (including the primary key). The replicator does not create or migrate tables on the StarbaseDB side.

> [!NOTE]
> Table and column identifiers are validated at construction time and must match `[A-Za-z_][A-Za-z0-9_]*`. Identifiers containing spaces, hyphens, quotes or reserved characters are not supported.
Loading