diff --git a/README.md b/README.md index 5931b1c..88e1508 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,54 @@ curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \ +
+
+[[r2_buckets]]
+binding = "DATABASE_DUMPS"
+bucket_name = "starbasedb-dumps"
+
+
+
+Kick off the job (returns 202 with a `jobId`):
+
+
+
+curl -X POST 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
+--header 'Authorization: Bearer ABC123' \
+--header 'Content-Type: application/json' \
+--data '{ "format": "sql", "callbackUrl": "https://hooks.example.com/dump-done" }'
+
+
+
+`format` may be `sql`, `csv`, or `json`. Optional fields: `callbackUrl` (POSTed the status view on completion), `table` (export a single table only), `chunkSize` (rows per SELECT batch; default 1000).
+
+Poll the status, then download once it reads `completed`:
+
+
+
+curl 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/status/JOB_ID' \
+--header 'Authorization: Bearer ABC123'
+
+curl 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/download/JOB_ID' \
+--header 'Authorization: Bearer ABC123' \
+--output database_dump.sql
+
+
+
+To cancel an in-flight job:
+
+
+
+curl -X DELETE 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/JOB_ID' \
+--header 'Authorization: Bearer ABC123'
+
+
+
diff --git a/src/do.ts b/src/do.ts
index b6bb2b6..5bb4468 100644
--- a/src/do.ts
+++ b/src/do.ts
@@ -1,4 +1,17 @@
import { DurableObject } from 'cloudflare:workers'
+import { runTick } from './export/dump-engine'
+import { createDumpHost, jobStateKey } from './export/do-dump-host'
+import {
+ DumpFormat,
+ DumpJobOptions,
+ DumpJobState,
+ DumpJobStatusView,
+ newJobState,
+ toStatusView,
+} from './export/streaming-dump'
+
+const ACTIVE_DUMP_KEY = 'dump:active'
+const DUMP_RESUME_DELAY_MS = 1_000
export class StarbaseDBDurableObject extends DurableObject {
// Durable storage for the SQL database
@@ -9,6 +22,8 @@ export class StarbaseDBDurableObject extends DurableObject {
public connections = new Map()
// Store the client auth token for requests back to our Worker
private clientAuthToken: string
+ // R2 bucket binding for streaming dump output (optional).
+ private dumpBucket: R2Bucket | undefined
/**
* The constructor is invoked once upon creation of the Durable Object, i.e. the first call to
@@ -22,6 +37,7 @@ export class StarbaseDBDurableObject extends DurableObject {
this.clientAuthToken = env.CLIENT_AUTHORIZATION_TOKEN
this.sql = ctx.storage.sql
this.storage = ctx.storage
+ this.dumpBucket = env.DATABASE_DUMPS
// Install default necessary `tmp_` tables for various features here.
const cacheStatement = `
@@ -72,6 +88,10 @@ export class StarbaseDBDurableObject extends DurableObject {
deleteAlarm: this.deleteAlarm.bind(this),
getStatistics: this.getStatistics.bind(this),
executeQuery: this.executeQuery.bind(this),
+ startDumpJob: this.startDumpJob.bind(this),
+ getDumpJob: this.getDumpJob.bind(this),
+ getDumpDownloadBody: this.getDumpDownloadBody.bind(this),
+ cancelDumpJob: this.cancelDumpJob.bind(this),
}
}
@@ -105,12 +125,28 @@ export class StarbaseDBDurableObject extends DurableObject {
}
async alarm() {
+ // Dispatch streaming dump work first so a long-running export does not
+ // get starved by other alarm-driven features. The dump engine sets its
+ // own continuation alarm if it needs another tick.
+ try {
+ await this.continueActiveDumpJob()
+ } catch (err) {
+ console.error('Failed to continue dump job:', err)
+ }
+
try {
// Fetch all the tasks that are marked to emit an event for this cycle.
- const task = (await this.executeQuery({
- sql: 'SELECT * FROM tmp_cron_tasks WHERE is_active = 1;',
- isRaw: false,
- })) as Record[]
+ // The cron table is created lazily by the CronPlugin; if it does not
+ // exist yet (fresh DO with no cron plugin installed) we silently skip.
+ let task: Record[] = []
+ try {
+ task = (await this.executeQuery({
+ sql: 'SELECT * FROM tmp_cron_tasks WHERE is_active = 1;',
+ isRaw: false,
+ })) as Record[]
+ } catch {
+ return
+ }
if (!task.length) {
return
@@ -148,6 +184,219 @@ export class StarbaseDBDurableObject extends DurableObject {
}
}
+ // -- Streaming dump job machinery -------------------------------------
+
+ /**
+ * Begin a new streaming dump job. Creates the R2 multipart upload,
+ * persists initial state, and schedules an immediate alarm to kick off
+ * the first tick. Returns a status view the worker can hand back to the
+ * client right away.
+ */
+ public async startDumpJob(
+ options: DumpJobOptions
+ ): Promise {
+ if (!this.dumpBucket) {
+ throw new Error(
+ 'Streaming dump requires the DATABASE_DUMPS R2 binding. ' +
+ 'Add an [[r2_buckets]] entry to wrangler.toml.'
+ )
+ }
+
+ const tables = await this.listUserTables(options.table)
+ const jobId = crypto.randomUUID()
+ const state = newJobState(jobId, options, tables)
+
+ // Open the R2 multipart upload up-front so part numbers and uploadId
+ // are stable across alarm-driven continuations.
+ const upload = await this.dumpBucket.createMultipartUpload(
+ state.objectKey,
+ {
+ httpMetadata: {
+ contentType: this.contentTypeFor(options.format),
+ contentDisposition: `attachment; filename="${state.objectKey.split('/').pop()}"`,
+ },
+ }
+ )
+ state.uploadId = upload.uploadId
+
+ await this.storage.put(jobStateKey(jobId), state)
+ await this.storage.put(ACTIVE_DUMP_KEY, jobId)
+ await this.setAlarm(Date.now() + 1000)
+ return toStatusView(state)
+ }
+
+ /** Read the current state of a dump job for the status endpoint. */
+ public async getDumpJob(jobId: string): Promise {
+ const state = await this.storage.get(jobStateKey(jobId))
+ if (!state) return null
+ return toStatusView(state)
+ }
+
+ /**
+ * Stream the dump body from R2 back to the worker so it can be relayed to
+ * the client. Returns null when the job is not finished yet or missing.
+ */
+ public async getDumpDownloadBody(jobId: string): Promise<{
+ body: ReadableStream
+ size: number
+ contentType: string
+ filename: string
+ } | null> {
+ if (!this.dumpBucket) return null
+ const state = await this.storage.get(jobStateKey(jobId))
+ if (!state || state.status !== 'completed') return null
+ const obj = await this.dumpBucket.get(state.objectKey)
+ if (!obj) return null
+ return {
+ body: obj.body,
+ size: obj.size,
+ contentType:
+ obj.httpMetadata?.contentType ??
+ this.contentTypeFor(state.format),
+ filename: state.objectKey.split('/').pop() ?? state.objectKey,
+ }
+ }
+
+ /** Cancel an in-flight dump job. Aborts the R2 multipart and marks failed. */
+ public async cancelDumpJob(
+ jobId: string
+ ): Promise {
+ const state = await this.storage.get(jobStateKey(jobId))
+ if (!state) return null
+ if (state.status === 'completed' || state.status === 'failed')
+ return toStatusView(state)
+
+ if (this.dumpBucket && state.uploadId) {
+ try {
+ const upload = this.dumpBucket.resumeMultipartUpload(
+ state.objectKey,
+ state.uploadId
+ )
+ await upload.abort()
+ } catch (err) {
+ console.error('Failed to abort R2 multipart upload:', err)
+ }
+ }
+ if (this.dumpBucket && state.pendingTempKey) {
+ await this.dumpBucket.delete(state.pendingTempKey).catch(() => {})
+ }
+ state.status = 'cancelled'
+ state.error = 'Cancelled by user'
+ state.progress.updatedAt = Date.now()
+ await this.storage.put(jobStateKey(jobId), state)
+ const active = await this.storage.get(ACTIVE_DUMP_KEY)
+ if (active === jobId) await this.storage.delete(ACTIVE_DUMP_KEY)
+ return toStatusView(state)
+ }
+
+ /**
+ * Run one tick of the active dump job (if any). Called from alarm().
+ * Persists state after every tick and re-arms the alarm if more work
+ * remains.
+ */
+ private async continueActiveDumpJob(): Promise {
+ const activeId = await this.storage.get(ACTIVE_DUMP_KEY)
+ if (!activeId) return
+ if (!this.dumpBucket) {
+ console.error(
+ 'Active dump job exists but DATABASE_DUMPS R2 binding is missing.'
+ )
+ await this.storage.delete(ACTIVE_DUMP_KEY)
+ return
+ }
+ const state = await this.storage.get(
+ jobStateKey(activeId)
+ )
+ if (!state) {
+ await this.storage.delete(ACTIVE_DUMP_KEY)
+ return
+ }
+ if (
+ state.status === 'completed' ||
+ state.status === 'failed' ||
+ state.status === 'cancelled'
+ ) {
+ await this.storage.delete(ACTIVE_DUMP_KEY)
+ return
+ }
+
+ const host = createDumpHost({
+ sql: this.sql,
+ storage: this.storage,
+ bucket: this.dumpBucket,
+ })
+
+ try {
+ const { done } = await runTick(state, host)
+ await this.storage.put(jobStateKey(state.jobId), state)
+ if (done) {
+ await this.storage.delete(ACTIVE_DUMP_KEY)
+ if (state.callbackUrl) {
+ this.fireDumpCallback(state).catch((err) =>
+ console.error('Dump callback failed:', err)
+ )
+ }
+ } else {
+ await this.setAlarm(Date.now() + DUMP_RESUME_DELAY_MS)
+ }
+ } catch (err) {
+ console.error('Dump engine error:', err)
+ // The engine sets state.status='failed' before throwing. Persist
+ // the failure so callers see it via the status endpoint.
+ state.status = 'failed'
+ state.error =
+ err instanceof Error ? err.message : String(err ?? 'unknown')
+ state.progress.updatedAt = Date.now()
+ await this.storage.put(jobStateKey(state.jobId), state)
+ await this.storage.delete(ACTIVE_DUMP_KEY)
+ if (state.callbackUrl) {
+ this.fireDumpCallback(state).catch((cbErr) =>
+ console.error('Dump failure callback failed:', cbErr)
+ )
+ }
+ }
+ }
+
+ private async fireDumpCallback(state: DumpJobState): Promise {
+ if (!state.callbackUrl) return
+ try {
+ await fetch(state.callbackUrl, {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify(toStatusView(state)),
+ })
+ } catch (err) {
+ console.error('Failed to fire dump completion callback:', err)
+ }
+ }
+
+ /**
+ * List the user-facing tables this DO holds. Excludes SQLite internal
+ * tables and the tmp_* feature tables which would otherwise leak into
+ * the export.
+ */
+ private async listUserTables(only?: string): Promise {
+ if (only) {
+ const exists = (await this.executeQuery({
+ sql: 'SELECT name FROM sqlite_master WHERE type = ? AND name = ?;',
+ params: ['table', only],
+ isRaw: false,
+ })) as Record[]
+ return exists.length ? [only] : []
+ }
+ const rows = (await this.executeQuery({
+ sql: "SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' AND name NOT LIKE 'tmp_%' AND name NOT LIKE '_cf_%' ORDER BY name;",
+ isRaw: false,
+ })) as Record[]
+ return rows.map((r) => String(r.name))
+ }
+
+ private contentTypeFor(format: DumpFormat): string {
+ if (format === 'csv') return 'text/csv'
+ if (format === 'json') return 'application/json'
+ return 'application/sql'
+ }
+
public async getStatistics(): Promise<{
databaseSize: number
activeConnections: number
diff --git a/src/export/do-dump-host.ts b/src/export/do-dump-host.ts
new file mode 100644
index 0000000..a323320
--- /dev/null
+++ b/src/export/do-dump-host.ts
@@ -0,0 +1,95 @@
+/**
+ * Durable Object adapter for the streaming dump engine.
+ *
+ * Bridges the abstract DumpEngineHost interface (defined in dump-engine.ts)
+ * to the concrete Cloudflare APIs: ctx.storage.sql, ctx.storage for state,
+ * the R2 bucket binding for parts and pending buffer.
+ */
+
+import type { DumpEngineHost, RowCursor } from './dump-engine'
+import { DumpJobState } from './streaming-dump'
+
+const STATE_KEY_PREFIX = 'dump:job:'
+
+export function jobStateKey(jobId: string): string {
+ return STATE_KEY_PREFIX + jobId
+}
+
+export interface DumpHostOptions {
+ sql: SqlStorage
+ storage: DurableObjectStorage
+ bucket: R2Bucket
+}
+
+export function createDumpHost(opts: DumpHostOptions): DumpEngineHost {
+ const { sql, storage, bucket } = opts
+
+ return {
+ query(sqlText: string, params?: unknown[]): RowCursor {
+ const cursor =
+ params && params.length
+ ? sql.exec(sqlText, ...(params as any[]))
+ : sql.exec(sqlText)
+
+ // SqlStorageCursor exposes a JS iterator directly, but we adapt it
+ // to the engine's RowCursor.next() contract (returns null at end
+ // instead of throwing IteratorResult.done).
+ return {
+ columns: cursor.columnNames as string[],
+ next(): Record | null {
+ const r = cursor.next()
+ if (r.done) return null
+ return r.value as Record
+ },
+ }
+ },
+
+ async saveState(state: DumpJobState): Promise {
+ await storage.put(jobStateKey(state.jobId), state)
+ },
+
+ async uploadPart(
+ uploadId: string,
+ key: string,
+ partNumber: number,
+ body: Uint8Array
+ ): Promise {
+ const upload = bucket.resumeMultipartUpload(key, uploadId)
+ // `uploadPart` accepts ArrayBuffer / ArrayBufferView. We need a
+ // detached copy so the underlying buffer cannot be mutated while
+ // the upload is in-flight.
+ const copy = new Uint8Array(body.byteLength)
+ copy.set(body)
+ return await upload.uploadPart(partNumber, copy)
+ },
+
+ async completeUpload(
+ uploadId: string,
+ key: string,
+ parts: R2UploadedPart[]
+ ): Promise {
+ const upload = bucket.resumeMultipartUpload(key, uploadId)
+ await upload.complete(parts)
+ },
+
+ async abortUpload(uploadId: string, key: string): Promise {
+ const upload = bucket.resumeMultipartUpload(key, uploadId)
+ await upload.abort()
+ },
+
+ async readPending(key: string): Promise {
+ const obj = await bucket.get(key)
+ if (!obj) return null
+ const buf = await obj.arrayBuffer()
+ return new Uint8Array(buf)
+ },
+
+ async writePending(key: string, bytes: Uint8Array): Promise {
+ await bucket.put(key, bytes)
+ },
+
+ async deletePending(key: string): Promise {
+ await bucket.delete(key)
+ },
+ }
+}
diff --git a/src/export/dump-engine.test.ts b/src/export/dump-engine.test.ts
new file mode 100644
index 0000000..1d1107e
--- /dev/null
+++ b/src/export/dump-engine.test.ts
@@ -0,0 +1,364 @@
+/**
+ * Tests for the streaming dump engine.
+ *
+ * These exercise the engine against a pure-JS fake host so we can drive
+ * deterministic scenarios (multiple ticks, mid-table yields, schema fan-out)
+ * without touching Cloudflare runtime APIs.
+ */
+
+import { describe, expect, it } from 'vitest'
+import { runTick, RowCursor, DumpEngineHost } from './dump-engine'
+import {
+ DumpFormat,
+ DumpJobState,
+ R2_MIN_PART_SIZE,
+ newJobState,
+} from './streaming-dump'
+
+type Table = {
+ name: string
+ columns: string[]
+ rows: Record[]
+ schemaSql?: string
+}
+
+/**
+ * In-memory test harness implementing DumpEngineHost. Captures all data
+ * "uploaded" to R2 so we can assert on the produced dump.
+ */
+class FakeHost implements DumpEngineHost {
+ public parts: { partNumber: number; body: Uint8Array }[] = []
+ public completed = false
+ public aborted = false
+ public pending = new Map()
+ public savedStates = 0
+ public clockMs = 1_000_000
+
+ constructor(private tables: Table[]) {}
+
+ query(sqlText: string, params?: unknown[]): RowCursor {
+ const lower = sqlText.toLowerCase()
+ if (lower.includes('from sqlite_master')) {
+ const target = String(params?.[0] ?? '')
+ const t = this.tables.find((x) => x.name === target)
+ const rows = t
+ ? [
+ {
+ type: 'table',
+ sql:
+ t.schemaSql ??
+ `CREATE TABLE ${t.name} (id INTEGER)`,
+ },
+ ]
+ : []
+ return cursorFromRows(['type', 'sql'], rows)
+ }
+ // SELECT * FROM "name" LIMIT N OFFSET M;
+ const m = sqlText.match(
+ /from\s+"([^"]+)"\s+limit\s+(\d+)\s+offset\s+(\d+)/i
+ )
+ if (!m) {
+ return cursorFromRows([], [])
+ }
+ const tableName = m[1].replace(/""/g, '"')
+ const limit = parseInt(m[2], 10)
+ const offset = parseInt(m[3], 10)
+ const t = this.tables.find((x) => x.name === tableName)
+ if (!t) return cursorFromRows([], [])
+ const slice = t.rows.slice(offset, offset + limit)
+ return cursorFromRows(t.columns, slice)
+ }
+
+ async saveState(_state: DumpJobState): Promise {
+ this.savedStates++
+ }
+
+ async uploadPart(
+ _uploadId: string,
+ _key: string,
+ partNumber: number,
+ body: Uint8Array
+ ): Promise {
+ // Copy so subsequent buffer mutations don't change recorded parts.
+ const copy = new Uint8Array(body.byteLength)
+ copy.set(body)
+ this.parts.push({ partNumber, body: copy })
+ return { partNumber, etag: `etag-${partNumber}` }
+ }
+
+ async completeUpload(): Promise {
+ this.completed = true
+ }
+
+ async abortUpload(): Promise {
+ this.aborted = true
+ }
+
+ async readPending(key: string): Promise {
+ return this.pending.get(key) ?? null
+ }
+
+ async writePending(key: string, bytes: Uint8Array): Promise {
+ const copy = new Uint8Array(bytes.byteLength)
+ copy.set(bytes)
+ this.pending.set(key, copy)
+ }
+
+ async deletePending(key: string): Promise {
+ this.pending.delete(key)
+ }
+
+ now = (): number => this.clockMs
+
+ advance(ms: number): void {
+ this.clockMs += ms
+ }
+
+ /** Assemble all uploaded parts into a single string for assertions. */
+ fullOutput(): string {
+ const sorted = [...this.parts].sort(
+ (a, b) => a.partNumber - b.partNumber
+ )
+ const total = sorted.reduce((n, p) => n + p.body.byteLength, 0)
+ const merged = new Uint8Array(total)
+ let off = 0
+ for (const p of sorted) {
+ merged.set(p.body, off)
+ off += p.body.byteLength
+ }
+ return new TextDecoder().decode(merged)
+ }
+}
+
+function cursorFromRows(
+ columns: string[],
+ rows: Record[]
+): RowCursor {
+ let i = 0
+ return {
+ columns,
+ next() {
+ if (i >= rows.length) return null
+ return rows[i++]
+ },
+ }
+}
+
+function makeState(
+ format: DumpFormat,
+ tables: string[],
+ overrides: Partial = {}
+): DumpJobState {
+ const state = newJobState('test-job-id', { format }, tables)
+ state.uploadId = 'upload-1'
+ Object.assign(state, overrides)
+ return state
+}
+
+describe('streaming dump engine', () => {
+ it('emits a SQL dump with schema and INSERT statements in one tick', async () => {
+ const host = new FakeHost([
+ {
+ name: 'users',
+ columns: ['id', 'name'],
+ rows: [
+ { id: 1, name: 'Alice' },
+ { id: 2, name: "O'Brien" },
+ ],
+ schemaSql: 'CREATE TABLE users (id INTEGER, name TEXT)',
+ },
+ {
+ name: 'orders',
+ columns: ['id', 'total'],
+ rows: [{ id: 7, total: 99.5 }],
+ schemaSql: 'CREATE TABLE orders (id INTEGER, total REAL)',
+ },
+ ])
+ const state = makeState('sql', ['users', 'orders'])
+ const { done } = await runTick(state, host)
+
+ expect(done).toBe(true)
+ expect(host.completed).toBe(true)
+ expect(state.status).toBe('completed')
+ const out = host.fullOutput()
+ expect(out).toContain('CREATE TABLE users (id INTEGER, name TEXT);')
+ expect(out).toContain(
+ 'INSERT INTO "users" ("id", "name") VALUES (1, \'Alice\');'
+ )
+ expect(out).toContain(
+ 'INSERT INTO "users" ("id", "name") VALUES (2, \'O\'\'Brien\');'
+ )
+ expect(out).toContain('CREATE TABLE orders (id INTEGER, total REAL);')
+ expect(out).toContain(
+ 'INSERT INTO "orders" ("id", "total") VALUES (7, 99.5);'
+ )
+ expect(out).toContain('COMMIT;')
+ })
+
+ it('emits a CSV dump with per-table headers', async () => {
+ const host = new FakeHost([
+ {
+ name: 'users',
+ columns: ['id', 'name'],
+ rows: [
+ { id: 1, name: 'Alice' },
+ { id: 2, name: 'comma, name' },
+ ],
+ },
+ ])
+ const state = makeState('csv', ['users'])
+ await runTick(state, host)
+
+ const out = host.fullOutput()
+ expect(out).toContain('# table: users')
+ expect(out).toContain('id,name')
+ expect(out).toContain('1,Alice')
+ expect(out).toContain('2,"comma, name"')
+ })
+
+ it('emits a JSON dump as a tables-keyed object', async () => {
+ const host = new FakeHost([
+ {
+ name: 'users',
+ columns: ['id', 'name'],
+ rows: [
+ { id: 1, name: 'Alice' },
+ { id: 2, name: 'Bob' },
+ ],
+ },
+ ])
+ const state = makeState('json', ['users'])
+ await runTick(state, host)
+
+ const text = host.fullOutput().trim()
+ const parsed = JSON.parse(text)
+ expect(parsed.users).toEqual([
+ { id: 1, name: 'Alice' },
+ { id: 2, name: 'Bob' },
+ ])
+ })
+
+ it('handles empty databases by writing only the wrapper', async () => {
+ const host = new FakeHost([])
+ const state = makeState('sql', [])
+ const { done } = await runTick(state, host)
+ expect(done).toBe(true)
+ const out = host.fullOutput()
+ expect(out).toContain('PRAGMA foreign_keys=OFF;')
+ expect(out).toContain('COMMIT;')
+ expect(out).not.toContain('INSERT INTO')
+ })
+
+ it('handles tables with zero rows by still emitting CSV headers', async () => {
+ const host = new FakeHost([
+ { name: 'empty', columns: ['id', 'name'], rows: [] },
+ ])
+ const state = makeState('csv', ['empty'])
+ await runTick(state, host)
+ const out = host.fullOutput()
+ expect(out).toContain('# table: empty')
+ expect(out).toContain('id,name')
+ })
+
+ it('yields when the time budget is hit and resumes on the next tick', async () => {
+ // 25 rows, chunkSize=5. The host's `now` jumps past the deadline after
+ // a couple of chunks worth of timestamp reads so the first tick yields
+ // before draining the table.
+ const rows = Array.from({ length: 25 }, (_, i) => ({ id: i + 1 }))
+ const host = new FakeHost([
+ {
+ name: 'big',
+ columns: ['id'],
+ rows,
+ schemaSql: 'CREATE TABLE big (id INTEGER)',
+ },
+ ])
+ const baseClock = host.clockMs
+ let callCount = 0
+ host.now = () => {
+ callCount++
+ // Let several calls through at baseClock so the engine can emit a
+ // couple of chunks before we fast-forward past the deadline. The
+ // engine checks `now()` once per outer loop and once per row batch,
+ // so we need to let ~6 calls return baseClock before jumping.
+ if (callCount <= 6) return baseClock
+ return baseClock + 25_000
+ }
+ const state = makeState('sql', ['big'])
+ state.options.chunkSize = 5
+ const first = await runTick(state, host)
+ expect(first.done).toBe(false)
+ const rowsBeforeResume = state.progress.rowsDumped
+ expect(rowsBeforeResume).toBeGreaterThan(0)
+ expect(rowsBeforeResume).toBeLessThan(25)
+
+ // Resume with a stable clock — second tick should finish the job.
+ host.now = () => host.clockMs
+ const second = await runTick(state, host)
+ expect(second.done).toBe(true)
+ expect(state.status).toBe('completed')
+ expect(state.progress.rowsDumped).toBe(25)
+ const out = host.fullOutput()
+ for (let i = 1; i <= 25; i++) {
+ expect(out).toContain(`VALUES (${i});`)
+ }
+ })
+
+ it('flushes multipart parts when the buffer crosses 5 MiB', async () => {
+ // Produce one row whose string value alone is ~2 MiB so we cross the
+ // 5 MiB part threshold quickly with only a handful of rows.
+ const big = 'x'.repeat(2 * 1024 * 1024)
+ const rows = Array.from({ length: 4 }, (_, i) => ({
+ id: i + 1,
+ payload: big,
+ }))
+ const host = new FakeHost([
+ {
+ name: 'fat',
+ columns: ['id', 'payload'],
+ rows,
+ },
+ ])
+ const state = makeState('sql', ['fat'])
+ await runTick(state, host)
+ // We should have flushed at least one full part and the final part.
+ const totalBytes = host.parts.reduce((n, p) => n + p.body.byteLength, 0)
+ expect(host.parts.length).toBeGreaterThanOrEqual(2)
+ expect(totalBytes).toBeGreaterThan(R2_MIN_PART_SIZE)
+ expect(host.completed).toBe(true)
+ })
+
+ it('marks the job failed and aborts the upload on engine error', async () => {
+ const host = new FakeHost([
+ {
+ name: 'broken',
+ columns: ['id'],
+ rows: [{ id: 1 }],
+ },
+ ])
+ // Force completeUpload to fail so we exercise the error path.
+ host.completeUpload = async () => {
+ throw new Error('boom')
+ }
+ const state = makeState('sql', ['broken'])
+ await expect(runTick(state, host)).rejects.toThrow('boom')
+ expect(state.status).toBe('failed')
+ expect(state.error).toBe('boom')
+ expect(host.aborted).toBe(true)
+ })
+
+ it("emits BLOB values as x'...' literals in SQL dumps", async () => {
+ const blob = new Uint8Array([0xde, 0xad, 0xbe, 0xef])
+ const host = new FakeHost([
+ {
+ name: 'bin',
+ columns: ['id', 'data'],
+ rows: [{ id: 1, data: blob }],
+ },
+ ])
+ const state = makeState('sql', ['bin'])
+ await runTick(state, host)
+ const out = host.fullOutput()
+ expect(out).toContain("x'deadbeef'")
+ })
+})
diff --git a/src/export/dump-engine.ts b/src/export/dump-engine.ts
new file mode 100644
index 0000000..81bb951
--- /dev/null
+++ b/src/export/dump-engine.ts
@@ -0,0 +1,497 @@
+/**
+ * Engine that drives a streaming dump through one or more DO invocations.
+ *
+ * Pulled out of the DO class itself so the logic can be unit-tested without
+ * spinning up the full Durable Object harness. The engine is stateless across
+ * ticks — all progress lives in DumpJobState which is loaded/persisted by the
+ * DO between invocations.
+ *
+ * Buffer strategy:
+ * - Within a tick, the pending bytes live in a Uint8Array in instance memory.
+ * - When the buffer reaches the R2 multipart minimum (5 MiB) it is flushed
+ * to R2 as the next part.
+ * - When a tick yields (deadline reached, work remaining), the leftover bytes
+ * are written to a temporary R2 object so the next tick can resume them.
+ * - When the job completes, the leftover bytes — however small — are flushed
+ * as the final multipart part, and the multipart upload is completed.
+ */
+
+import {
+ DumpJobState,
+ DEFAULT_CHUNK_SIZE,
+ R2_MIN_PART_SIZE,
+ TICK_BUDGET_MS,
+ csvCell,
+ quoteIdent,
+ sqlLiteral,
+} from './streaming-dump'
+
+/**
+ * Minimal cursor-like interface so the engine can be tested without depending
+ * on the Cloudflare SqlStorage runtime. Returning `null` from next() signals
+ * the end of the cursor; `columns` is materialized lazily.
+ */
+export interface RowCursor {
+ columns: string[]
+ next(): Record | null
+ close?(): void
+}
+
+/**
+ * The narrow runtime contract the engine needs from its host. The DO supplies
+ * a concrete implementation that reaches into ctx.storage.sql, the R2 bucket
+ * binding, and DO storage for persistence.
+ */
+export interface DumpEngineHost {
+ /** Run a SELECT and stream rows back. */
+ query(sql: string, params?: unknown[]): RowCursor
+ /** Persist the job state so the next tick / status read can see it. */
+ saveState(state: DumpJobState): Promise
+ /** Upload a multipart segment and return the part metadata. */
+ uploadPart(
+ uploadId: string,
+ key: string,
+ partNumber: number,
+ body: Uint8Array
+ ): Promise
+ /** Finalize the multipart upload once all parts are recorded. */
+ completeUpload(
+ uploadId: string,
+ key: string,
+ parts: R2UploadedPart[]
+ ): Promise
+ /** Abort the multipart upload — invoked when the job fails. */
+ abortUpload(uploadId: string, key: string): Promise
+ /** Read the pending buffer carried over from a previous tick. */
+ readPending(key: string): Promise
+ /** Persist the pending buffer for the next tick to resume from. */
+ writePending(key: string, bytes: Uint8Array): Promise
+ /** Clear the pending buffer after final flush. */
+ deletePending(key: string): Promise
+ /** Optional clock override for tests. */
+ now?(): number
+}
+
+const encoder = new TextEncoder()
+
+/**
+ * Mutable per-tick buffer wrapper. Maintains a Uint8Array internally and
+ * grows it geometrically to amortize the cost of repeated appends. Flushes a
+ * multipart part to R2 whenever the buffer crosses R2_MIN_PART_SIZE.
+ */
+class TickBuffer {
+ private buf: Uint8Array
+ private len: number
+
+ constructor(initial: Uint8Array = new Uint8Array()) {
+ // Reserve a chunk slightly above the multipart threshold so we rarely
+ // need to grow the backing array during a normal tick.
+ const capacity = Math.max(
+ R2_MIN_PART_SIZE + 64 * 1024,
+ initial.byteLength
+ )
+ this.buf = new Uint8Array(capacity)
+ this.buf.set(initial, 0)
+ this.len = initial.byteLength
+ }
+
+ get length(): number {
+ return this.len
+ }
+
+ /** Return the live buffer slice — caller must not retain across appends. */
+ view(): Uint8Array {
+ return this.buf.subarray(0, this.len)
+ }
+
+ /** Detach the current contents as an owned copy and reset the buffer. */
+ drain(): Uint8Array {
+ const out = new Uint8Array(this.len)
+ out.set(this.buf.subarray(0, this.len))
+ this.len = 0
+ return out
+ }
+
+ append(bytes: Uint8Array): void {
+ const needed = this.len + bytes.byteLength
+ if (needed > this.buf.byteLength) {
+ let cap = this.buf.byteLength * 2
+ while (cap < needed) cap *= 2
+ const grown = new Uint8Array(cap)
+ grown.set(this.buf.subarray(0, this.len), 0)
+ this.buf = grown
+ }
+ this.buf.set(bytes, this.len)
+ this.len += bytes.byteLength
+ }
+}
+
+async function maybeFlushPart(
+ state: DumpJobState,
+ host: DumpEngineHost,
+ buffer: TickBuffer
+): Promise {
+ if (buffer.length < R2_MIN_PART_SIZE) return
+ if (!state.uploadId)
+ throw new Error('maybeFlushPart called without an active upload')
+ const slice = buffer.drain()
+ const partNumber = state.parts.length + 1
+ const part = await host.uploadPart(
+ state.uploadId,
+ state.objectKey,
+ partNumber,
+ slice
+ )
+ state.parts.push(part)
+ state.progress.partsUploaded = state.parts.length
+}
+
+async function flushFinalPart(
+ state: DumpJobState,
+ host: DumpEngineHost,
+ buffer: TickBuffer
+): Promise {
+ if (!state.uploadId)
+ throw new Error('flushFinalPart called without an active upload')
+ if (buffer.length === 0) return
+ const slice = buffer.drain()
+ const partNumber = state.parts.length + 1
+ const part = await host.uploadPart(
+ state.uploadId,
+ state.objectKey,
+ partNumber,
+ slice
+ )
+ state.parts.push(part)
+ state.progress.partsUploaded = state.parts.length
+}
+
+/** Append a UTF-8 string into the running buffer and update byte counters. */
+function writeStr(state: DumpJobState, buffer: TickBuffer, str: string): void {
+ const bytes = encoder.encode(str)
+ buffer.append(bytes)
+ state.progress.bytesWritten += bytes.byteLength
+}
+
+/** Build the SQL fragment that pages through a single table in stable order. */
+function pageSql(table: string, chunkSize: number, offset: number): string {
+ // We rely on whatever natural order SQLite gives us — adding an ORDER BY
+ // on an unindexed table would force a full sort per page, which is far
+ // worse than the slightly weaker ordering guarantee. For tables with a
+ // ROWID the order is stable across the dump because we hold the DO alone.
+ return `SELECT * FROM ${quoteIdent(table)} LIMIT ${chunkSize} OFFSET ${offset};`
+}
+
+/**
+ * Execute a single tick. Returns true when the whole dump is finished. The
+ * caller is responsible for persisting state and re-arming alarms based on
+ * the returned value.
+ */
+export async function runTick(
+ state: DumpJobState,
+ host: DumpEngineHost
+): Promise<{ done: boolean }> {
+ const now = host.now ?? Date.now
+ const deadline = now() + TICK_BUDGET_MS
+ state.status = 'processing'
+
+ // Hydrate the pending buffer from R2 if a previous tick left one behind.
+ const initial = state.pendingTempKey
+ ? ((await host.readPending(state.pendingTempKey)) ?? new Uint8Array())
+ : new Uint8Array()
+ const buffer = new TickBuffer(initial)
+
+ try {
+ if (state.phase === 'header') {
+ if (state.format === 'sql') {
+ writeStr(
+ state,
+ buffer,
+ '-- StarbaseDB streaming dump\n' +
+ `-- Job: ${state.jobId}\n` +
+ `-- Generated: ${new Date().toISOString()}\n` +
+ 'PRAGMA foreign_keys=OFF;\n' +
+ 'BEGIN TRANSACTION;\n\n'
+ )
+ } else if (state.format === 'json') {
+ writeStr(state, buffer, '{\n')
+ }
+ state.phase =
+ state.progress.tables.length === 0 ? 'finalize' : 'schema'
+ state.progress.updatedAt = now()
+ }
+
+ while (state.phase !== 'finalize' && state.phase !== 'done') {
+ if (now() >= deadline) {
+ return await yieldTick(state, host, buffer, now())
+ }
+
+ const idx = state.progress.currentTableIndex
+ if (idx >= state.progress.tables.length) {
+ state.phase = 'finalize'
+ break
+ }
+ const table = state.progress.tables[idx]
+ state.progress.currentTable = table
+
+ if (state.phase === 'schema') {
+ await emitSchema(state, host, buffer, table, idx)
+ state.schemaEmitted = true
+ state.phase = 'rows'
+ state.currentColumns = null
+ state.jsonRowWritten = false
+ }
+
+ if (state.phase === 'rows') {
+ const done = emitRowBatches(
+ state,
+ host,
+ buffer,
+ table,
+ deadline,
+ now
+ )
+ // emitRowBatches may flush parts mid-loop. It returns when
+ // either time is up (returns false) or the table is exhausted.
+ if (!(await done)) {
+ return await yieldTick(state, host, buffer, now())
+ }
+ // Close JSON array for this table.
+ if (state.format === 'json' && state.jsonTableOpened) {
+ writeStr(
+ state,
+ buffer,
+ state.jsonRowWritten ? '\n ]' : ']'
+ )
+ state.jsonTableOpened = false
+ }
+ // Maybe flush after each table boundary so memory stays bounded
+ // even if the next table is much larger.
+ await maybeFlushPart(state, host, buffer)
+ }
+
+ // Advance to the next table.
+ state.progress.currentTableIndex = idx + 1
+ state.progress.rowOffset = 0
+ state.schemaEmitted = false
+ state.currentColumns = null
+ state.jsonTableOpened = false
+ state.jsonRowWritten = false
+ state.phase = 'schema'
+ }
+
+ if (state.phase === 'finalize') {
+ if (state.format === 'sql') {
+ writeStr(state, buffer, '\nCOMMIT;\n')
+ } else if (state.format === 'json') {
+ writeStr(state, buffer, '\n}\n')
+ }
+ await flushFinalPart(state, host, buffer)
+ if (state.uploadId) {
+ await host.completeUpload(
+ state.uploadId,
+ state.objectKey,
+ state.parts
+ )
+ }
+ if (state.pendingTempKey) {
+ await host.deletePending(state.pendingTempKey).catch(() => {})
+ state.pendingTempKey = undefined
+ state.pendingBufferBytes = 0
+ }
+ state.phase = 'done'
+ state.status = 'completed'
+ state.progress.completedAt = now()
+ state.progress.updatedAt = now()
+ return { done: true }
+ }
+
+ state.progress.updatedAt = now()
+ return { done: false }
+ } catch (err) {
+ state.error = err instanceof Error ? err.message : String(err)
+ state.status = 'failed'
+ state.phase = 'error'
+ state.progress.updatedAt = now()
+ if (state.uploadId) {
+ await host
+ .abortUpload(state.uploadId, state.objectKey)
+ .catch(() => {})
+ }
+ if (state.pendingTempKey) {
+ await host.deletePending(state.pendingTempKey).catch(() => {})
+ state.pendingTempKey = undefined
+ state.pendingBufferBytes = 0
+ }
+ throw err
+ }
+}
+
+/**
+ * Save the pending buffer to R2 (so the next tick can hydrate it) and update
+ * progress metadata before yielding control back to the DO alarm scheduler.
+ */
+async function yieldTick(
+ state: DumpJobState,
+ host: DumpEngineHost,
+ buffer: TickBuffer,
+ nowMs: number
+): Promise<{ done: boolean }> {
+ const bytes = buffer.drain()
+ if (bytes.byteLength > 0) {
+ const key = state.pendingTempKey ?? `dumps/.pending/${state.jobId}.tmp`
+ await host.writePending(key, bytes)
+ state.pendingTempKey = key
+ state.pendingBufferBytes = bytes.byteLength
+ } else if (state.pendingTempKey) {
+ // Buffer was flushed mid-tick; clear the stale temp object.
+ await host.deletePending(state.pendingTempKey).catch(() => {})
+ state.pendingTempKey = undefined
+ state.pendingBufferBytes = 0
+ }
+ state.progress.updatedAt = nowMs
+ return { done: false }
+}
+
+/**
+ * Emit the schema / per-format prelude for a given table. SQL dumps mirror
+ * sqlite_master entries (table + indexes + triggers + views). CSV/JSON emit
+ * just a separator/header line.
+ */
+async function emitSchema(
+ state: DumpJobState,
+ host: DumpEngineHost,
+ buffer: TickBuffer,
+ table: string,
+ tableIndex: number
+): Promise {
+ if (state.format === 'sql') {
+ const cursor = host.query(
+ `SELECT type, sql FROM sqlite_master WHERE tbl_name = ? AND sql IS NOT NULL AND type IN ('table','index','trigger','view');`,
+ [table]
+ )
+ let header = `-- Table: ${table}\n`
+ const stmts: string[] = []
+ try {
+ while (true) {
+ const row = cursor.next()
+ if (!row) break
+ const sql = String(row.sql ?? '').trim()
+ if (!sql) continue
+ stmts.push(`${sql};\n`)
+ }
+ } finally {
+ cursor.close?.()
+ }
+ writeStr(state, buffer, header + stmts.join('') + '\n')
+ } else if (state.format === 'csv') {
+ if (tableIndex > 0) writeStr(state, buffer, '\n')
+ writeStr(state, buffer, `# table: ${table}\n`)
+ } else if (state.format === 'json') {
+ if (tableIndex > 0) writeStr(state, buffer, ',\n')
+ writeStr(state, buffer, ` ${JSON.stringify(table)}: [`)
+ state.jsonTableOpened = true
+ state.jsonRowWritten = false
+ }
+}
+
+/**
+ * Emit row chunks for a table. Each chunk is bounded by `chunkSize` rows.
+ * Returns true when the table is fully drained, false if we exit early
+ * because the deadline is approaching.
+ */
+async function emitRowBatches(
+ state: DumpJobState,
+ host: DumpEngineHost,
+ buffer: TickBuffer,
+ table: string,
+ deadline: number,
+ now: () => number
+): Promise {
+ const chunkSize = state.options.chunkSize || DEFAULT_CHUNK_SIZE
+
+ while (true) {
+ if (now() >= deadline) return false
+
+ const cursor = host.query(
+ pageSql(table, chunkSize, state.progress.rowOffset)
+ )
+
+ if (!state.currentColumns) {
+ state.currentColumns = cursor.columns
+ // Emit the CSV header on the very first batch of this table.
+ if (state.format === 'csv') {
+ writeStr(
+ state,
+ buffer,
+ state.currentColumns.map(csvCell).join(',') + '\n'
+ )
+ }
+ }
+
+ let rowsThisBatch = 0
+ try {
+ while (true) {
+ const row = cursor.next()
+ if (!row) break
+ emitRow(state, buffer, table, row)
+ rowsThisBatch++
+ state.progress.rowsDumped++
+ state.progress.rowOffset++
+ }
+ } finally {
+ cursor.close?.()
+ }
+
+ // Flush whatever crossed the 5 MiB line during this chunk.
+ await maybeFlushPart(state, host, buffer)
+
+ if (rowsThisBatch < chunkSize) {
+ // Table is fully drained.
+ if (state.format === 'sql') {
+ writeStr(state, buffer, '\n')
+ }
+ return true
+ }
+
+ // Persist progress so a crash after a flush is recoverable from this
+ // exact offset, then loop and continue with the next chunk.
+ await host.saveState(state)
+ }
+}
+
+/** Serialize a single row into the active format and append it to the buffer. */
+function emitRow(
+ state: DumpJobState,
+ buffer: TickBuffer,
+ table: string,
+ row: Record
+): void {
+ if (state.format === 'sql') {
+ const cols = state.currentColumns!
+ const values = cols.map((c) => sqlLiteral(row[c]))
+ const colList = cols.map(quoteIdent).join(', ')
+ writeStr(
+ state,
+ buffer,
+ `INSERT INTO ${quoteIdent(table)} (${colList}) VALUES (${values.join(', ')});\n`
+ )
+ return
+ }
+
+ if (state.format === 'csv') {
+ const cols = state.currentColumns!
+ writeStr(
+ state,
+ buffer,
+ cols.map((c) => csvCell(row[c])).join(',') + '\n'
+ )
+ return
+ }
+
+ if (state.format === 'json') {
+ const prefix = state.jsonRowWritten ? ',\n ' : '\n '
+ writeStr(state, buffer, prefix + JSON.stringify(row))
+ state.jsonRowWritten = true
+ }
+}
diff --git a/src/export/dump.ts b/src/export/dump.ts
index 91a2e89..ef17a58 100644
--- a/src/export/dump.ts
+++ b/src/export/dump.ts
@@ -1,8 +1,32 @@
+/**
+ * /export/dump entry points.
+ *
+ * Two execution paths exist behind these routes:
+ *
+ * 1. Legacy synchronous path (preserved for backwards compatibility):
+ * buffers the entire dump into memory and returns it inline. This still
+ * works for small databases and clients hitting GET /export/dump with
+ * the old contract.
+ *
+ * 2. Streaming path (new): when the DATABASE_DUMPS R2 binding is wired and
+ * the caller opts in (POST /export/dump or appends ?stream=true), the
+ * job is handed off to the Durable Object which paginates rows, flushes
+ * chunks into R2, and uses an alarm to survive past the 30-second worker
+ * limit. The response contains a jobId the client can poll until the
+ * object is fully written.
+ */
+
import { executeOperation } from '.'
import { StarbaseDBConfiguration } from '../handler'
import { DataSource } from '../types'
import { createResponse } from '../utils'
+import { DumpFormat, DumpJobOptions, DumpJobStatusView } from './streaming-dump'
+/**
+ * Legacy in-memory SQL dump. Retained for very small databases and existing
+ * clients that depend on the original `GET /export/dump` contract. Internal
+ * databases that exceed a few MB should use the streaming endpoint instead.
+ */
export async function dumpDatabaseRoute(
dataSource: DataSource,
config: StarbaseDBConfiguration
@@ -69,3 +93,209 @@ export async function dumpDatabaseRoute(
return createResponse(undefined, 'Failed to create database dump', 500)
}
}
+
+// -- Streaming entry points -----------------------------------------------
+
+/** Optional body for POST /export/dump. All fields are optional. */
+export interface StartDumpRequestBody {
+ format?: DumpFormat
+ callbackUrl?: string
+ table?: string
+ chunkSize?: number
+}
+
+function parseFormat(value: string | null | undefined): DumpFormat {
+ const v = String(value ?? 'sql').toLowerCase()
+ if (v === 'csv' || v === 'json' || v === 'sql') return v
+ return 'sql'
+}
+
+/**
+ * Start a streaming dump job. Returns 202 with the job descriptor. The caller
+ * polls `/export/dump/status/:jobId` and downloads the finished artifact
+ * from `/export/dump/download/:jobId`.
+ */
+export async function startStreamingDumpRoute(
+ request: Request,
+ dataSource: DataSource,
+ config: StarbaseDBConfiguration
+): Promise {
+ try {
+ if (dataSource.source !== 'internal' || !dataSource.rpc.startDumpJob) {
+ return createResponse(
+ undefined,
+ 'Streaming dump is only available for the internal data source.',
+ 400
+ )
+ }
+
+ const url = new URL(request.url)
+ let body: StartDumpRequestBody = {}
+ if (request.method !== 'GET') {
+ const contentType =
+ request.headers.get('Content-Type')?.toLowerCase() ?? ''
+ if (contentType.includes('application/json')) {
+ try {
+ body =
+ ((await request.json()) as StartDumpRequestBody) ?? {}
+ } catch {
+ return createResponse(undefined, 'Invalid JSON body.', 400)
+ }
+ }
+ }
+
+ const options: DumpJobOptions = {
+ format: parseFormat(body.format ?? url.searchParams.get('format')),
+ callbackUrl:
+ body.callbackUrl ??
+ url.searchParams.get('callbackUrl') ??
+ undefined,
+ table: body.table ?? url.searchParams.get('table') ?? undefined,
+ chunkSize: body.chunkSize ?? undefined,
+ }
+
+ const status = await dataSource.rpc.startDumpJob(options)
+
+ const baseUrl = `${url.origin}`
+ const statusUrl = `${baseUrl}/export/dump/status/${status.jobId}`
+ const downloadUrl = `${baseUrl}/export/dump/download/${status.jobId}`
+
+ return new Response(
+ JSON.stringify({
+ result: {
+ ...status,
+ statusUrl,
+ downloadUrl,
+ },
+ error: undefined,
+ }),
+ {
+ status: 202,
+ headers: {
+ 'Content-Type': 'application/json',
+ Location: statusUrl,
+ },
+ }
+ )
+ } catch (error: any) {
+ console.error('Start streaming dump error:', error)
+ return createResponse(
+ undefined,
+ error?.message ?? 'Failed to start dump job.',
+ 500
+ )
+ }
+}
+
+/** GET /export/dump/status/:jobId — returns the job status view. */
+export async function getDumpJobStatusRoute(
+ jobId: string,
+ request: Request,
+ dataSource: DataSource
+): Promise {
+ try {
+ if (!dataSource.rpc.getDumpJob) {
+ return createResponse(undefined, 'Not supported.', 400)
+ }
+ const view = (await dataSource.rpc.getDumpJob(
+ jobId
+ )) as DumpJobStatusView | null
+ if (!view) {
+ return createResponse(undefined, `Job '${jobId}' not found.`, 404)
+ }
+ const url = new URL(request.url)
+ const downloadUrl =
+ view.status === 'completed'
+ ? `${url.origin}/export/dump/download/${jobId}`
+ : undefined
+ return new Response(
+ JSON.stringify({
+ result: { ...view, downloadUrl },
+ error: undefined,
+ }),
+ {
+ status: 200,
+ headers: { 'Content-Type': 'application/json' },
+ }
+ )
+ } catch (error: any) {
+ console.error('Get dump job status error:', error)
+ return createResponse(
+ undefined,
+ error?.message ?? 'Failed to fetch job status.',
+ 500
+ )
+ }
+}
+
+/** GET /export/dump/download/:jobId — streams the finished file from R2. */
+export async function downloadDumpJobRoute(
+ jobId: string,
+ dataSource: DataSource
+): Promise {
+ try {
+ if (!dataSource.rpc.getDumpDownloadBody) {
+ return createResponse(undefined, 'Not supported.', 400)
+ }
+ const result = (await dataSource.rpc.getDumpDownloadBody(jobId)) as {
+ body: ReadableStream
+ size: number
+ contentType: string
+ filename: string
+ } | null
+ if (!result) {
+ return createResponse(
+ undefined,
+ `Dump for job '${jobId}' is not available yet.`,
+ 404
+ )
+ }
+ return new Response(result.body, {
+ status: 200,
+ headers: {
+ 'Content-Type': result.contentType,
+ 'Content-Length': String(result.size),
+ 'Content-Disposition': `attachment; filename="${result.filename}"`,
+ },
+ })
+ } catch (error: any) {
+ console.error('Download dump job error:', error)
+ return createResponse(
+ undefined,
+ error?.message ?? 'Failed to download dump.',
+ 500
+ )
+ }
+}
+
+/** DELETE /export/dump/:jobId — cancel an in-flight job. */
+export async function cancelDumpJobRoute(
+ jobId: string,
+ dataSource: DataSource
+): Promise {
+ try {
+ if (!dataSource.rpc.cancelDumpJob) {
+ return createResponse(undefined, 'Not supported.', 400)
+ }
+ const view = (await dataSource.rpc.cancelDumpJob(
+ jobId
+ )) as DumpJobStatusView | null
+ if (!view) {
+ return createResponse(undefined, `Job '${jobId}' not found.`, 404)
+ }
+ return new Response(
+ JSON.stringify({ result: view, error: undefined }),
+ {
+ status: 200,
+ headers: { 'Content-Type': 'application/json' },
+ }
+ )
+ } catch (error: any) {
+ console.error('Cancel dump job error:', error)
+ return createResponse(
+ undefined,
+ error?.message ?? 'Failed to cancel job.',
+ 500
+ )
+ }
+}
diff --git a/src/export/streaming-dump-route.test.ts b/src/export/streaming-dump-route.test.ts
new file mode 100644
index 0000000..4a4a8e9
--- /dev/null
+++ b/src/export/streaming-dump-route.test.ts
@@ -0,0 +1,212 @@
+/**
+ * Smoke tests for the streaming dump HTTP entry points in src/export/dump.ts.
+ * The real DO RPC is mocked — these tests just confirm parameter parsing,
+ * status code shape, and routing of the call into the RPC surface.
+ */
+
+import { describe, expect, it, vi, beforeEach } from 'vitest'
+import {
+ cancelDumpJobRoute,
+ downloadDumpJobRoute,
+ getDumpJobStatusRoute,
+ startStreamingDumpRoute,
+} from './dump'
+import type { DataSource } from '../types'
+import type { StarbaseDBConfiguration } from '../handler'
+
+function makeRpcMock(overrides: Record = {}) {
+ return {
+ executeQuery: vi.fn(),
+ startDumpJob: vi.fn().mockResolvedValue({
+ jobId: 'job-abc',
+ status: 'queued',
+ format: 'sql',
+ objectKey: 'dumps/dump.sql',
+ progress: {
+ tables: ['users'],
+ currentTableIndex: 0,
+ currentTable: 'users',
+ rowOffset: 0,
+ rowsDumped: 0,
+ bytesWritten: 0,
+ partsUploaded: 0,
+ startedAt: 1,
+ updatedAt: 1,
+ },
+ }),
+ getDumpJob: vi.fn().mockResolvedValue({
+ jobId: 'job-abc',
+ status: 'processing',
+ format: 'sql',
+ objectKey: 'dumps/dump.sql',
+ progress: {
+ tables: ['users'],
+ currentTableIndex: 0,
+ currentTable: 'users',
+ rowOffset: 0,
+ rowsDumped: 100,
+ bytesWritten: 4096,
+ partsUploaded: 0,
+ startedAt: 1,
+ updatedAt: 2,
+ },
+ }),
+ getDumpDownloadBody: vi.fn(),
+ cancelDumpJob: vi.fn(),
+ ...overrides,
+ }
+}
+
+let dataSource: DataSource
+let config: StarbaseDBConfiguration
+
+beforeEach(() => {
+ dataSource = {
+ source: 'internal',
+ rpc: makeRpcMock() as any,
+ } as any
+ config = {
+ role: 'admin',
+ features: { export: true },
+ }
+})
+
+describe('streaming dump HTTP routes', () => {
+ it('startStreamingDumpRoute returns 202 with statusUrl/downloadUrl', async () => {
+ const req = new Request('https://api.example.com/export/dump', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ format: 'sql' }),
+ })
+ const res = await startStreamingDumpRoute(req, dataSource, config)
+ expect(res.status).toBe(202)
+ expect(res.headers.get('Location')).toContain(
+ '/export/dump/status/job-abc'
+ )
+ const json: any = await res.json()
+ expect(json.result.jobId).toBe('job-abc')
+ expect(json.result.statusUrl).toBe(
+ 'https://api.example.com/export/dump/status/job-abc'
+ )
+ expect(json.result.downloadUrl).toBe(
+ 'https://api.example.com/export/dump/download/job-abc'
+ )
+ expect(dataSource.rpc.startDumpJob).toHaveBeenCalledWith(
+ expect.objectContaining({ format: 'sql' })
+ )
+ })
+
+ it('startStreamingDumpRoute accepts format via query string', async () => {
+ const req = new Request(
+ 'https://api.example.com/export/dump?format=csv',
+ { method: 'POST' }
+ )
+ await startStreamingDumpRoute(req, dataSource, config)
+ expect(dataSource.rpc.startDumpJob).toHaveBeenCalledWith(
+ expect.objectContaining({ format: 'csv' })
+ )
+ })
+
+ it('startStreamingDumpRoute rejects external data sources', async () => {
+ const external: DataSource = {
+ source: 'external',
+ rpc: makeRpcMock() as any,
+ } as any
+ const req = new Request('https://api.example.com/export/dump', {
+ method: 'POST',
+ })
+ const res = await startStreamingDumpRoute(req, external, config)
+ expect(res.status).toBe(400)
+ })
+
+ it('getDumpJobStatusRoute returns 404 for missing jobs', async () => {
+ ;(dataSource.rpc as any).getDumpJob = vi.fn().mockResolvedValue(null)
+ const req = new Request(
+ 'https://api.example.com/export/dump/status/nope'
+ )
+ const res = await getDumpJobStatusRoute('nope', req, dataSource)
+ expect(res.status).toBe(404)
+ })
+
+ it('getDumpJobStatusRoute adds downloadUrl only on completion', async () => {
+ const req = new Request(
+ 'https://api.example.com/export/dump/status/job-abc'
+ )
+ const res = await getDumpJobStatusRoute('job-abc', req, dataSource)
+ const json: any = await res.json()
+ expect(json.result.downloadUrl).toBeUndefined() // still processing
+ ;(dataSource.rpc as any).getDumpJob = vi.fn().mockResolvedValue({
+ jobId: 'job-abc',
+ status: 'completed',
+ format: 'sql',
+ objectKey: 'dumps/dump.sql',
+ progress: {
+ tables: [],
+ currentTableIndex: 0,
+ currentTable: null,
+ rowOffset: 0,
+ rowsDumped: 0,
+ bytesWritten: 0,
+ partsUploaded: 0,
+ startedAt: 1,
+ updatedAt: 2,
+ },
+ })
+ const res2 = await getDumpJobStatusRoute('job-abc', req, dataSource)
+ const json2: any = await res2.json()
+ expect(json2.result.downloadUrl).toBe(
+ 'https://api.example.com/export/dump/download/job-abc'
+ )
+ })
+
+ it('downloadDumpJobRoute streams the R2 body when available', async () => {
+ const body = new Response('hello world').body
+ ;(dataSource.rpc as any).getDumpDownloadBody = vi
+ .fn()
+ .mockResolvedValue({
+ body,
+ size: 11,
+ contentType: 'application/sql',
+ filename: 'dump.sql',
+ })
+ const res = await downloadDumpJobRoute('job-abc', dataSource)
+ expect(res.status).toBe(200)
+ expect(res.headers.get('Content-Length')).toBe('11')
+ expect(res.headers.get('Content-Disposition')).toBe(
+ 'attachment; filename="dump.sql"'
+ )
+ expect(await res.text()).toBe('hello world')
+ })
+
+ it('downloadDumpJobRoute returns 404 when dump is not ready', async () => {
+ ;(dataSource.rpc as any).getDumpDownloadBody = vi
+ .fn()
+ .mockResolvedValue(null)
+ const res = await downloadDumpJobRoute('job-abc', dataSource)
+ expect(res.status).toBe(404)
+ })
+
+ it('cancelDumpJobRoute round-trips through the DO RPC', async () => {
+ ;(dataSource.rpc as any).cancelDumpJob = vi.fn().mockResolvedValue({
+ jobId: 'job-abc',
+ status: 'cancelled',
+ format: 'sql',
+ objectKey: 'dumps/dump.sql',
+ progress: {
+ tables: [],
+ currentTableIndex: 0,
+ currentTable: null,
+ rowOffset: 0,
+ rowsDumped: 0,
+ bytesWritten: 0,
+ partsUploaded: 0,
+ startedAt: 1,
+ updatedAt: 2,
+ },
+ })
+ const res = await cancelDumpJobRoute('job-abc', dataSource)
+ expect(res.status).toBe(200)
+ const json: any = await res.json()
+ expect(json.result.status).toBe('cancelled')
+ })
+})
diff --git a/src/export/streaming-dump.ts b/src/export/streaming-dump.ts
new file mode 100644
index 0000000..6957e79
--- /dev/null
+++ b/src/export/streaming-dump.ts
@@ -0,0 +1,228 @@
+/**
+ * Streaming database dump implementation.
+ *
+ * Designed to work around two Cloudflare constraints that the legacy
+ * src/export/dump.ts cannot:
+ * 1. Durable Object memory ceiling (currently 1GB, soon 10GB) — the legacy
+ * path accumulates the whole dump in a JS string before responding.
+ * 2. 30-second wall-clock limit on a single Worker / DO request.
+ *
+ * The job runs inside the Durable Object itself: state is held in DO storage
+ * so it survives invocations, dump bytes are flushed straight into an R2
+ * multipart upload, and an alarm re-enters the DO to continue work whenever a
+ * single tick approaches the time budget.
+ *
+ * Pending (sub-5MiB) bytes that haven't been flushed as a multipart part yet
+ * live in a temporary R2 object — DO storage values are capped at 128 KiB, so
+ * the buffer cannot live in storage directly.
+ */
+
+export type DumpFormat = 'sql' | 'csv' | 'json'
+
+export type DumpStatus =
+ | 'queued'
+ | 'processing'
+ | 'completed'
+ | 'failed'
+ | 'cancelled'
+
+export interface DumpJobOptions {
+ format: DumpFormat
+ /** Optional callback URL invoked once the dump completes (success or failure). */
+ callbackUrl?: string
+ /** Optional single table to dump (CSV/JSON default to first table when omitted). */
+ table?: string
+ /** Optional chunk size override (rows per SELECT batch). Default 1000. */
+ chunkSize?: number
+}
+
+export interface DumpJobProgress {
+ tables: string[]
+ currentTableIndex: number
+ currentTable: string | null
+ rowOffset: number
+ rowsDumped: number
+ bytesWritten: number
+ partsUploaded: number
+ startedAt: number
+ updatedAt: number
+ completedAt?: number
+}
+
+export interface DumpJobState {
+ jobId: string
+ status: DumpStatus
+ format: DumpFormat
+ callbackUrl?: string
+ /** R2 object key the finished dump will be written under. */
+ objectKey: string
+ /** Multipart upload identifier, present while parts are being streamed. */
+ uploadId?: string
+ /** Completed parts of the multipart upload. */
+ parts: R2UploadedPart[]
+ /** R2 key for the not-yet-flushed leftover bytes, if any. */
+ pendingTempKey?: string
+ /** Size of the pending temp buffer (informational). */
+ pendingBufferBytes: number
+ /** Phase tracking — controls what runTick() does next. */
+ phase: 'header' | 'schema' | 'rows' | 'finalize' | 'done' | 'error'
+ /** Whether the schema for the current table has been emitted yet. */
+ schemaEmitted: boolean
+ /** Cached column names for the current table when streaming rows. */
+ currentColumns: string[] | null
+ /** Whether the JSON array for the current table has been opened. */
+ jsonTableOpened: boolean
+ /** Whether at least one row has been written into the current JSON array. */
+ jsonRowWritten: boolean
+ error?: string
+ options: { chunkSize: number }
+ progress: DumpJobProgress
+}
+
+/** R2 multipart minimum part size — 5 MiB per Cloudflare docs. Last part may be smaller. */
+export const R2_MIN_PART_SIZE = 5 * 1024 * 1024
+
+/** Maximum wall-clock time we spend in a single tick before yielding to the alarm. */
+export const TICK_BUDGET_MS = 20_000
+
+/** Default rows fetched per SELECT during the dump. */
+export const DEFAULT_CHUNK_SIZE = 1000
+
+/**
+ * Quote a SQLite identifier (table / column name) by wrapping it in double
+ * quotes and doubling any embedded double quotes. Mirrors sqlite_master's
+ * own encoding rules so the produced dump round-trips through `sqlite3`.
+ */
+export function quoteIdent(name: string): string {
+ return `"${String(name).replace(/"/g, '""')}"`
+}
+
+/** Quote a SQL string literal — single quotes are escaped by doubling. */
+export function quoteString(value: string): string {
+ return `'${value.replace(/'/g, "''")}'`
+}
+
+/**
+ * Serialize a single JS value into a SQLite SQL literal suitable for an
+ * INSERT statement. Handles NULL, numbers, strings, booleans, and bytes
+ * (rendered as the x'...' BLOB literal SQLite understands).
+ */
+export function sqlLiteral(value: unknown): string {
+ if (value === null || value === undefined) return 'NULL'
+ if (typeof value === 'number') {
+ if (!Number.isFinite(value)) return 'NULL'
+ return String(value)
+ }
+ if (typeof value === 'bigint') return value.toString()
+ if (typeof value === 'boolean') return value ? '1' : '0'
+ if (value instanceof ArrayBuffer || ArrayBuffer.isView(value)) {
+ const bytes =
+ value instanceof ArrayBuffer
+ ? new Uint8Array(value)
+ : new Uint8Array(
+ (value as ArrayBufferView).buffer,
+ (value as ArrayBufferView).byteOffset,
+ (value as ArrayBufferView).byteLength
+ )
+ let hex = ''
+ for (const b of bytes) hex += b.toString(16).padStart(2, '0')
+ return `x'${hex}'`
+ }
+ if (typeof value === 'object') return quoteString(JSON.stringify(value))
+ return quoteString(String(value))
+}
+
+/** Quote a value for inclusion in a CSV cell, following RFC 4180. */
+export function csvCell(value: unknown): string {
+ if (value === null || value === undefined) return ''
+ if (typeof value === 'number' || typeof value === 'bigint')
+ return String(value)
+ if (typeof value === 'boolean') return value ? 'true' : 'false'
+ const str =
+ typeof value === 'object' ? JSON.stringify(value) : String(value)
+ if (/[",\r\n]/.test(str)) return `"${str.replace(/"/g, '""')}"`
+ return str
+}
+
+/**
+ * Build the R2 object key used to store a job's dump. Embeds the timestamp
+ * and a short random suffix so concurrent jobs cannot collide on the same key.
+ */
+export function buildObjectKey(jobId: string, format: DumpFormat): string {
+ const ext = format === 'sql' ? 'sql' : format
+ const stamp = new Date()
+ .toISOString()
+ .replace(/[:.]/g, '-')
+ .replace('T', '_')
+ .replace('Z', '')
+ return `dumps/dump_${stamp}_${jobId.slice(0, 8)}.${ext}`
+}
+
+/** Build the R2 key for the temporary pending-buffer object. */
+export function buildPendingKey(jobId: string): string {
+ return `dumps/.pending/${jobId}.tmp`
+}
+
+/**
+ * Create the initial state for a freshly queued job. The caller is expected
+ * to attach an R2 uploadId once the multipart upload has been created.
+ */
+export function newJobState(
+ jobId: string,
+ options: DumpJobOptions,
+ tables: string[]
+): DumpJobState {
+ const now = Date.now()
+ return {
+ jobId,
+ status: 'queued',
+ format: options.format,
+ callbackUrl: options.callbackUrl,
+ objectKey: buildObjectKey(jobId, options.format),
+ uploadId: undefined,
+ parts: [],
+ pendingTempKey: undefined,
+ pendingBufferBytes: 0,
+ phase: 'header',
+ schemaEmitted: false,
+ currentColumns: null,
+ jsonTableOpened: false,
+ jsonRowWritten: false,
+ options: { chunkSize: options.chunkSize ?? DEFAULT_CHUNK_SIZE },
+ progress: {
+ tables,
+ currentTableIndex: 0,
+ currentTable: tables[0] ?? null,
+ rowOffset: 0,
+ rowsDumped: 0,
+ bytesWritten: 0,
+ partsUploaded: 0,
+ startedAt: now,
+ updatedAt: now,
+ },
+ }
+}
+
+/**
+ * A public-facing view of the job state, suitable for serializing into a
+ * status endpoint response. Strips engine internals.
+ */
+export interface DumpJobStatusView {
+ jobId: string
+ status: DumpStatus
+ format: DumpFormat
+ objectKey: string
+ error?: string
+ progress: DumpJobProgress
+}
+
+export function toStatusView(state: DumpJobState): DumpJobStatusView {
+ return {
+ jobId: state.jobId,
+ status: state.status,
+ format: state.format,
+ objectKey: state.objectKey,
+ error: state.error,
+ progress: state.progress,
+ }
+}
diff --git a/src/handler.test.ts b/src/handler.test.ts
index 86bb328..c292d5a 100644
--- a/src/handler.test.ts
+++ b/src/handler.test.ts
@@ -22,6 +22,9 @@ vi.mock('hono', () => {
use: vi.fn(),
post: vi.fn(),
get: vi.fn(),
+ put: vi.fn(),
+ delete: vi.fn(),
+ patch: vi.fn(),
all: vi.fn(),
fetch: vi.fn().mockResolvedValue(new Response('mock-response')),
notFound: vi.fn(),
diff --git a/src/handler.ts b/src/handler.ts
index 3fa0085..fa0241e 100644
--- a/src/handler.ts
+++ b/src/handler.ts
@@ -6,7 +6,13 @@ import { DataSource } from './types'
import { LiteREST } from './literest'
import { executeQuery, executeTransaction } from './operation'
import { createResponse, QueryRequest, QueryTransactionRequest } from './utils'
-import { dumpDatabaseRoute } from './export/dump'
+import {
+ cancelDumpJobRoute,
+ downloadDumpJobRoute,
+ dumpDatabaseRoute,
+ getDumpJobStatusRoute,
+ startStreamingDumpRoute,
+} from './export/dump'
import { exportTableToJsonRoute } from './export/json'
import { exportTableToCsvRoute } from './export/csv'
import { importDumpRoute } from './import/dump'
@@ -120,10 +126,54 @@ export class StarbaseDB {
}
if (this.getFeature('export')) {
+ // Legacy synchronous dump — still works for small databases that
+ // can finish within the 30s worker budget.
this.app.get('/export/dump', this.isInternalSource, async () => {
return dumpDatabaseRoute(this.dataSource, this.config)
})
+ // Streaming dump: kicks off a background job whose output is
+ // written to R2 over potentially many DO alarm ticks. Returns 202
+ // with a jobId the client uses to poll status and download.
+ this.app.post('/export/dump', this.isInternalSource, async (c) => {
+ return startStreamingDumpRoute(
+ c.req.raw,
+ this.dataSource,
+ this.config
+ )
+ })
+
+ this.app.get(
+ '/export/dump/status/:jobId',
+ this.isInternalSource,
+ async (c) => {
+ const jobId = c.req.param('jobId')
+ return getDumpJobStatusRoute(
+ jobId,
+ c.req.raw,
+ this.dataSource
+ )
+ }
+ )
+
+ this.app.get(
+ '/export/dump/download/:jobId',
+ this.isInternalSource,
+ async (c) => {
+ const jobId = c.req.param('jobId')
+ return downloadDumpJobRoute(jobId, this.dataSource)
+ }
+ )
+
+ this.app.delete(
+ '/export/dump/:jobId',
+ this.isInternalSource,
+ async (c) => {
+ const jobId = c.req.param('jobId')
+ return cancelDumpJobRoute(jobId, this.dataSource)
+ }
+ )
+
this.app.get(
'/export/json/:tableName',
this.isInternalSource,
diff --git a/src/index.ts b/src/index.ts
index 4d08932..c878284 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -56,6 +56,11 @@ export interface Env {
HYPERDRIVE: Hyperdrive
+ // Optional R2 bucket used to persist long-running database dumps so they
+ // can survive the 30 second Worker / DO request limit. When unbound the
+ // /export/dump endpoint falls back to the original synchronous behavior.
+ DATABASE_DUMPS?: R2Bucket
+
// ## DO NOT REMOVE: TEMPLATE INTERFACE ##
}
diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts
index 6c35c6f..37409ae 100644
--- a/worker-configuration.d.ts
+++ b/worker-configuration.d.ts
@@ -13,4 +13,5 @@ interface Env {
DATABASE_DURABLE_OBJECT: DurableObjectNamespace<
import('./src/index').StarbaseDBDurableObject
>
+ DATABASE_DUMPS?: R2Bucket
}
diff --git a/wrangler.toml b/wrangler.toml
index 395c4ac..22f119f 100644
--- a/wrangler.toml
+++ b/wrangler.toml
@@ -33,6 +33,13 @@ class_name = "StarbaseDBDurableObject"
tag = "v1"
new_sqlite_classes = ["StarbaseDBDurableObject"]
+# R2 bucket used to stream large database dumps. Required for /export/dump on
+# databases that exceed the inline 30-second / 1GB synchronous limits.
+# Create the bucket with: `wrangler r2 bucket create starbasedb-dumps`
+[[r2_buckets]]
+binding = "DATABASE_DUMPS"
+bucket_name = "starbasedb-dumps"
+
[vars]
# Use this in your Authorization header for full database access
ADMIN_AUTHORIZATION_TOKEN = "ABC123"