Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,54 @@ curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
</code>
</pre>

<h4>Streaming SQL dumps for large databases</h4>
The synchronous endpoint above buffers the whole dump in memory and is bounded by the 30-second Worker timeout. For databases that exceed that budget, switch to the streaming variant — it paginates rows, writes them straight into R2 over (potentially many) Durable Object alarm ticks, and lets you download the finished artifact when ready.

Add an R2 binding called `DATABASE_DUMPS` to your `wrangler.toml`:

<pre>
<code>
[[r2_buckets]]
binding = "DATABASE_DUMPS"
bucket_name = "starbasedb-dumps"
</code>
</pre>

Kick off the job (returns 202 with a `jobId`):

<pre>
<code>
curl -X POST 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
--header 'Authorization: Bearer ABC123' \
--header 'Content-Type: application/json' \
--data '{ "format": "sql", "callbackUrl": "https://hooks.example.com/dump-done" }'
</code>
</pre>

`format` may be `sql`, `csv`, or `json`. Optional fields: `callbackUrl` (POSTed the status view on completion), `table` (export a single table only), `chunkSize` (rows per SELECT batch; default 1000).

Poll the status, then download once it reads `completed`:

<pre>
<code>
curl 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/status/JOB_ID' \
--header 'Authorization: Bearer ABC123'

curl 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/download/JOB_ID' \
--header 'Authorization: Bearer ABC123' \
--output database_dump.sql
</code>
</pre>

To cancel an in-flight job:

<pre>
<code>
curl -X DELETE 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/JOB_ID' \
--header 'Authorization: Bearer ABC123'
</code>
</pre>

<h3>JSON Data Export</h3>
<pre>
<code>
Expand Down
257 changes: 253 additions & 4 deletions src/do.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
import { DurableObject } from 'cloudflare:workers'
import { runTick } from './export/dump-engine'
import { createDumpHost, jobStateKey } from './export/do-dump-host'
import {
DumpFormat,
DumpJobOptions,
DumpJobState,
DumpJobStatusView,
newJobState,
toStatusView,
} from './export/streaming-dump'

const ACTIVE_DUMP_KEY = 'dump:active'
const DUMP_RESUME_DELAY_MS = 1_000

export class StarbaseDBDurableObject extends DurableObject {
// Durable storage for the SQL database
Expand All @@ -9,6 +22,8 @@ export class StarbaseDBDurableObject extends DurableObject {
public connections = new Map<string, WebSocket>()
// Store the client auth token for requests back to our Worker
private clientAuthToken: string
// R2 bucket binding for streaming dump output (optional).
private dumpBucket: R2Bucket | undefined

/**
* The constructor is invoked once upon creation of the Durable Object, i.e. the first call to
Expand All @@ -22,6 +37,7 @@ export class StarbaseDBDurableObject extends DurableObject {
this.clientAuthToken = env.CLIENT_AUTHORIZATION_TOKEN
this.sql = ctx.storage.sql
this.storage = ctx.storage
this.dumpBucket = env.DATABASE_DUMPS

// Install default necessary `tmp_` tables for various features here.
const cacheStatement = `
Expand Down Expand Up @@ -72,6 +88,10 @@ export class StarbaseDBDurableObject extends DurableObject {
deleteAlarm: this.deleteAlarm.bind(this),
getStatistics: this.getStatistics.bind(this),
executeQuery: this.executeQuery.bind(this),
startDumpJob: this.startDumpJob.bind(this),
getDumpJob: this.getDumpJob.bind(this),
getDumpDownloadBody: this.getDumpDownloadBody.bind(this),
cancelDumpJob: this.cancelDumpJob.bind(this),
}
}

Expand Down Expand Up @@ -105,12 +125,28 @@ export class StarbaseDBDurableObject extends DurableObject {
}

async alarm() {
// Dispatch streaming dump work first so a long-running export does not
// get starved by other alarm-driven features. The dump engine sets its
// own continuation alarm if it needs another tick.
try {
await this.continueActiveDumpJob()
} catch (err) {
console.error('Failed to continue dump job:', err)
}

try {
// Fetch all the tasks that are marked to emit an event for this cycle.
const task = (await this.executeQuery({
sql: 'SELECT * FROM tmp_cron_tasks WHERE is_active = 1;',
isRaw: false,
})) as Record<string, SqlStorageValue>[]
// The cron table is created lazily by the CronPlugin; if it does not
// exist yet (fresh DO with no cron plugin installed) we silently skip.
let task: Record<string, SqlStorageValue>[] = []
try {
task = (await this.executeQuery({
sql: 'SELECT * FROM tmp_cron_tasks WHERE is_active = 1;',
isRaw: false,
})) as Record<string, SqlStorageValue>[]
} catch {
return
}

if (!task.length) {
return
Expand Down Expand Up @@ -148,6 +184,219 @@ export class StarbaseDBDurableObject extends DurableObject {
}
}

// -- Streaming dump job machinery -------------------------------------

/**
* Begin a new streaming dump job. Creates the R2 multipart upload,
* persists initial state, and schedules an immediate alarm to kick off
* the first tick. Returns a status view the worker can hand back to the
* client right away.
*/
public async startDumpJob(
options: DumpJobOptions
): Promise<DumpJobStatusView> {
if (!this.dumpBucket) {
throw new Error(
'Streaming dump requires the DATABASE_DUMPS R2 binding. ' +
'Add an [[r2_buckets]] entry to wrangler.toml.'
)
}

const tables = await this.listUserTables(options.table)
const jobId = crypto.randomUUID()
const state = newJobState(jobId, options, tables)

// Open the R2 multipart upload up-front so part numbers and uploadId
// are stable across alarm-driven continuations.
const upload = await this.dumpBucket.createMultipartUpload(
state.objectKey,
{
httpMetadata: {
contentType: this.contentTypeFor(options.format),
contentDisposition: `attachment; filename="${state.objectKey.split('/').pop()}"`,
},
}
)
state.uploadId = upload.uploadId

await this.storage.put(jobStateKey(jobId), state)
await this.storage.put(ACTIVE_DUMP_KEY, jobId)
await this.setAlarm(Date.now() + 1000)
return toStatusView(state)
}

/** Read the current state of a dump job for the status endpoint. */
public async getDumpJob(jobId: string): Promise<DumpJobStatusView | null> {
const state = await this.storage.get<DumpJobState>(jobStateKey(jobId))
if (!state) return null
return toStatusView(state)
}

/**
* Stream the dump body from R2 back to the worker so it can be relayed to
* the client. Returns null when the job is not finished yet or missing.
*/
public async getDumpDownloadBody(jobId: string): Promise<{
body: ReadableStream
size: number
contentType: string
filename: string
} | null> {
if (!this.dumpBucket) return null
const state = await this.storage.get<DumpJobState>(jobStateKey(jobId))
if (!state || state.status !== 'completed') return null
const obj = await this.dumpBucket.get(state.objectKey)
if (!obj) return null
return {
body: obj.body,
size: obj.size,
contentType:
obj.httpMetadata?.contentType ??
this.contentTypeFor(state.format),
filename: state.objectKey.split('/').pop() ?? state.objectKey,
}
}

/** Cancel an in-flight dump job. Aborts the R2 multipart and marks failed. */
public async cancelDumpJob(
jobId: string
): Promise<DumpJobStatusView | null> {
const state = await this.storage.get<DumpJobState>(jobStateKey(jobId))
if (!state) return null
if (state.status === 'completed' || state.status === 'failed')
return toStatusView(state)

if (this.dumpBucket && state.uploadId) {
try {
const upload = this.dumpBucket.resumeMultipartUpload(
state.objectKey,
state.uploadId
)
await upload.abort()
} catch (err) {
console.error('Failed to abort R2 multipart upload:', err)
}
}
if (this.dumpBucket && state.pendingTempKey) {
await this.dumpBucket.delete(state.pendingTempKey).catch(() => {})
}
state.status = 'cancelled'
state.error = 'Cancelled by user'
state.progress.updatedAt = Date.now()
await this.storage.put(jobStateKey(jobId), state)
const active = await this.storage.get<string>(ACTIVE_DUMP_KEY)
if (active === jobId) await this.storage.delete(ACTIVE_DUMP_KEY)
return toStatusView(state)
}

/**
* Run one tick of the active dump job (if any). Called from alarm().
* Persists state after every tick and re-arms the alarm if more work
* remains.
*/
private async continueActiveDumpJob(): Promise<void> {
const activeId = await this.storage.get<string>(ACTIVE_DUMP_KEY)
if (!activeId) return
if (!this.dumpBucket) {
console.error(
'Active dump job exists but DATABASE_DUMPS R2 binding is missing.'
)
await this.storage.delete(ACTIVE_DUMP_KEY)
return
}
const state = await this.storage.get<DumpJobState>(
jobStateKey(activeId)
)
if (!state) {
await this.storage.delete(ACTIVE_DUMP_KEY)
return
}
if (
state.status === 'completed' ||
state.status === 'failed' ||
state.status === 'cancelled'
) {
await this.storage.delete(ACTIVE_DUMP_KEY)
return
}

const host = createDumpHost({
sql: this.sql,
storage: this.storage,
bucket: this.dumpBucket,
})

try {
const { done } = await runTick(state, host)
await this.storage.put(jobStateKey(state.jobId), state)
if (done) {
await this.storage.delete(ACTIVE_DUMP_KEY)
if (state.callbackUrl) {
this.fireDumpCallback(state).catch((err) =>
console.error('Dump callback failed:', err)
)
}
} else {
await this.setAlarm(Date.now() + DUMP_RESUME_DELAY_MS)
}
} catch (err) {
console.error('Dump engine error:', err)
// The engine sets state.status='failed' before throwing. Persist
// the failure so callers see it via the status endpoint.
state.status = 'failed'
state.error =
err instanceof Error ? err.message : String(err ?? 'unknown')
state.progress.updatedAt = Date.now()
await this.storage.put(jobStateKey(state.jobId), state)
await this.storage.delete(ACTIVE_DUMP_KEY)
if (state.callbackUrl) {
this.fireDumpCallback(state).catch((cbErr) =>
console.error('Dump failure callback failed:', cbErr)
)
}
}
}

private async fireDumpCallback(state: DumpJobState): Promise<void> {
if (!state.callbackUrl) return
try {
await fetch(state.callbackUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(toStatusView(state)),
})
} catch (err) {
console.error('Failed to fire dump completion callback:', err)
}
}

/**
* List the user-facing tables this DO holds. Excludes SQLite internal
* tables and the tmp_* feature tables which would otherwise leak into
* the export.
*/
private async listUserTables(only?: string): Promise<string[]> {
if (only) {
const exists = (await this.executeQuery({
sql: 'SELECT name FROM sqlite_master WHERE type = ? AND name = ?;',
params: ['table', only],
isRaw: false,
})) as Record<string, SqlStorageValue>[]
return exists.length ? [only] : []
}
const rows = (await this.executeQuery({
sql: "SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' AND name NOT LIKE 'tmp_%' AND name NOT LIKE '_cf_%' ORDER BY name;",
isRaw: false,
})) as Record<string, SqlStorageValue>[]
return rows.map((r) => String(r.name))
}

private contentTypeFor(format: DumpFormat): string {
if (format === 'csv') return 'text/csv'
if (format === 'json') return 'application/json'
return 'application/sql'
}

public async getStatistics(): Promise<{
databaseSize: number
activeConnections: number
Expand Down
Loading