Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/fix-sqlite-wasm-tx-export-1494.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"@objectstack/driver-sqlite-wasm": patch
---

Fix `COMMIT; - cannot commit - no transaction is active` under `persist: 'on-write'` (#1494).

sql.js's `Database.export()` closes and reopens the database (it has no in-place
serialize), which rolls back any open transaction. The fire-and-forget flush
triggered after a write inside a Knex transaction (e.g. the autonumber sequence
`BEGIN…COMMIT`) could therefore abort that transaction, leaving the trailing
`COMMIT` to fail. The connection is now transaction-aware: `flush()` is deferred
while a transaction is open and runs once it fully closes, so committed data is
still persisted without aborting in-flight transactions.
2 changes: 1 addition & 1 deletion .objectui-sha
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1508a8d32b881335f5820dd47fbb91470c13cbad
fdd083657e2da9832059492d4c88e818a5990a8d
12 changes: 11 additions & 1 deletion packages/plugins/driver-sqlite-wasm/src/knex-wasm-dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,17 @@ export function getClient_WasmSqlite(): any {
if (isDdl) {
db.run(obj.sql, bindings as any);
obj.response = [];
connection.markDirty('run');
// Transaction-control statements are routed through
// `noteTransactionControl`, which owns flushing for the transaction
// lifecycle: it suppresses flushes while a transaction is open (sql.js
// `export()` closes+reopens the db, which would abort the txn) and
// performs a single flush once the transaction fully closes. Routing
// them away from `markDirty` avoids a second, racing flush on COMMIT.
if (/^\s*(BEGIN|COMMIT|END|ROLLBACK|SAVEPOINT|RELEASE)\b/i.test(obj.sql)) {
connection.noteTransactionControl(obj.sql);
} else {
connection.markDirty('run');
}
return obj;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect, afterEach } from 'vitest';
import { mkdtempSync, rmSync, statSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';

import { SqliteWasmDriver } from '../src/index.js';

/**
* Regression for #1494: under `persist: 'on-write'`, a write issued inside a
* Knex transaction triggers a fire-and-forget flush. sql.js's `export()`
* closes and reopens the database (it has no in-place serialize), which rolls
* back the open transaction — so the eventual `COMMIT` failed with
* "cannot commit - no transaction is active".
*
* The driver must defer the flush until the transaction fully closes, so the
* transaction commits cleanly and the data still lands on disk afterwards.
*/
describe('SqliteWasmDriver on-write persistence + transactions (#1494)', () => {
const dirs: string[] = [];
const drivers: SqliteWasmDriver[] = [];

function newDriver(persist: 'on-write' | 'on-disconnect' = 'on-write') {
const dir = mkdtempSync(join(tmpdir(), 'wasm-tx-'));
const file = join(dir, 'test.db');
const driver = new SqliteWasmDriver({ filename: file, persist });
dirs.push(dir);
drivers.push(driver);
return { driver, dir, file };
}

afterEach(async () => {
await Promise.all(drivers.splice(0).map((d) => d.disconnect().catch(() => {})));
for (const dir of dirs.splice(0)) rmSync(dir, { recursive: true, force: true });
});

it('commits a multi-statement transaction without aborting it via flush', async () => {
const { driver } = newDriver('on-write');
await driver.initObjects([{ name: 'acct', fields: { name: { type: 'string' } } }]);
const knex = (driver as any).knex;

await knex.transaction(async (trx: any) => {
// Plain insert (no RETURNING) takes the write path → markDirty → flush.
await trx('acct').insert({ id: 'a1', name: 'A' });
// An await inside the transaction gives the deferred flush a chance to
// run its export() — which previously rolled the transaction back.
await trx('acct').where('id', 'a1').first();
await trx('acct').insert({ id: 'a2', name: 'B' });
});

expect((await knex('acct')).length).toBe(2);
});

it('persists committed rows to disk after the transaction closes', async () => {
const { driver, file } = newDriver('on-write');
await driver.initObjects([{ name: 'acct', fields: { name: { type: 'string' } } }]);
const knex = (driver as any).knex;

await knex.transaction(async (trx: any) => {
await trx('acct').insert({ id: 'p1', name: 'persisted' });
});
// flush() awaits the post-commit write to disk (the per-write persist is
// deferred until the transaction closes, then performed exactly once).
await (driver as any).flush();

expect(statSync(file).size).toBeGreaterThan(0);

// Reopen from disk in a second driver to prove the row survived.
const reopened = new SqliteWasmDriver({ filename: file, persist: 'on-disconnect' });
drivers.push(reopened);
const rows = await (reopened as any).knex('acct').where('id', 'p1');
expect(rows.length).toBe(1);
expect(rows[0].name).toBe('persisted');
});

it('autonumber inserts (internal transaction) succeed under on-write', async () => {
const { driver } = newDriver('on-write');
await driver.initObjects([
{
name: 'acct',
fields: {
name: { type: 'string' },
num: { type: 'autonumber', format: 'A-{0000}' },
},
},
]);

// Each create runs getNextSequenceValue() in its own BEGIN…COMMIT, with a
// non-RETURNING sequence write inside it — the exact shape that tripped.
const out: string[] = [];
for (let i = 0; i < 10; i++) {
const r = await driver.create('acct', { name: `R${i}` });
out.push(r.num);
}
expect(out[0]).toBe('A-0001');
expect(out[9]).toBe('A-0010');
expect(new Set(out).size).toBe(10);
});

it('handles nested transactions (savepoints) without flushing mid-transaction', async () => {
const { driver } = newDriver('on-write');
await driver.initObjects([
{ name: 'acct', fields: { name: { type: 'string' } } },
{ name: 'log', fields: { msg: { type: 'string' } } },
]);
const knex = (driver as any).knex;

await knex.transaction(async (trx: any) => {
await trx('acct').insert({ id: 'a1', name: 'A' });
await trx.transaction(async (inner: any) => {
await inner('log').insert({ id: 'l1', msg: 'nested' });
await inner('log').where('id', 'l1').first();
});
await trx('acct').insert({ id: 'a2', name: 'B' });
});

expect((await knex('acct')).length).toBe(2);
expect((await knex('log')).length).toBe(1);
});
});
127 changes: 97 additions & 30 deletions packages/plugins/driver-sqlite-wasm/src/wasm-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,30 @@ export class WasmSqliteConnection {
private dirty = false;
private debounceMs = 0;
private debounceTimer: ReturnType<typeof setTimeout> | null = null;
private pendingFlush: Promise<void> | null = null;
private flushChain: Promise<void> | null = null;
private destroyed = false;
private logger: { warn: (msg: string, meta?: unknown) => void };

/**
* Whether a `BEGIN…COMMIT/ROLLBACK` transaction is currently open. Tracked
* because sql.js's {@link Database.export} closes and reopens the database
* (it has no in-place serialize), and closing a connection rolls back any
* open transaction. Flushing mid-transaction would therefore silently
* abort it, leaving the eventual `COMMIT` to fail with
* "cannot commit - no transaction is active". We defer the flush until the
* transaction fully closes. See {@link noteTransactionControl}.
*/
private rootTxActive = false;
/** Open `SAVEPOINT` depth (nested transactions emitted by Knex). */
private savepointDepth = 0;
/** A flush was requested while a transaction was open; run it on close. */
private flushDeferred = false;

/** True while any transaction (root or savepoint) is in flight. */
private get inTransaction(): boolean {
return this.rootTxActive || this.savepointDepth > 0;
}

constructor(opts: WasmConnectionOptions) {
this.filename = opts.filename;
this.persist = opts.persist ?? 'on-disconnect';
Expand Down Expand Up @@ -160,6 +180,45 @@ export class WasmSqliteConnection {
this.db = bytes ? new SQL.Database(bytes) : new SQL.Database();
}

/**
* Update transaction state from a transaction-control statement and, when a
* transaction has just fully closed, run any flush that was deferred while
* it was open. Called by the Knex dialect for every `BEGIN` / `COMMIT` /
* `ROLLBACK` / `SAVEPOINT` / `RELEASE` statement.
*
* We bias toward "in transaction": an unrecognised form leaves the flag set,
* which at worst delays a flush (safe) rather than exporting mid-transaction
* (which would abort it).
*/
noteTransactionControl(sql: string): void {
const s = sql.trim().toUpperCase();
if (/^BEGIN\b/.test(s)) {
this.rootTxActive = true;
} else if (/^(COMMIT|END)\b/.test(s)) {
// A COMMIT/END ends the whole transaction regardless of savepoint nesting.
this.rootTxActive = false;
this.savepointDepth = 0;
} else if (/^ROLLBACK\s+TO\b/.test(s)) {
// Rolls back to a savepoint but keeps the (outer) transaction open.
} else if (/^ROLLBACK\b/.test(s)) {
this.rootTxActive = false;
this.savepointDepth = 0;
} else if (/^SAVEPOINT\b/.test(s)) {
this.savepointDepth += 1;
} else if (/^RELEASE\b/.test(s)) {
this.savepointDepth = Math.max(0, this.savepointDepth - 1);
}
// If the transaction just fully closed and a flush was deferred while it
// was open, run it now. We key off `flushDeferred` (set only when
// `markDirty` actually wanted to flush) rather than `dirty`, so persist
// modes that don't flush per-write — e.g. `on-disconnect` — still defer to
// close() instead of flushing on every COMMIT.
if (!this.inTransaction && this.flushDeferred) {
this.flushDeferred = false;
void this.flush();
}
}

/** Hint that a mutation just executed; schedule a flush if needed. */
markDirty(method?: string): void {
if (this.isEphemeral || !this.fs) return;
Expand All @@ -180,46 +239,49 @@ export class WasmSqliteConnection {
// 'on-disconnect' → flush only at close()
}

/** Force a write of the current database state to disk. */
/**
* Force a write of the current database state to disk.
*
* Flushes are strictly serialized through a single promise chain: every call
* appends an export+write step that runs after all previously-queued steps.
* This matters because sql.js `export()` mutates the live connection (it
* closes and reopens the database), so two exports must never overlap — and
* because the returned promise must not resolve until the caller's own write
* has hit disk (deterministic for tests and for `close()`). Each step
* re-checks `dirty` at run time, so a no-op write collapses cheaply and a
* write that arrived mid-flush is captured by the next queued step.
*/
async flush(): Promise<void> {
if (this.isEphemeral || !this.fs || this.destroyed) return;
// If a flush is already in flight, wait for it and then re-flush so any
// writes that arrived after the in-flight flush's `db.export()` call get
// persisted too. Without this, on-write mode loses writes that happen
// between a flush's synchronous export and its async file write.
if (this.pendingFlush) {
await this.pendingFlush;
if (!this.dirty || this.destroyed) return;
// Never export while a transaction is open: sql.js's `export()` closes and
// reopens the database, which rolls back the in-flight transaction and
// makes the subsequent COMMIT fail. Defer until the transaction closes
// (handled in `noteTransactionControl`).
if (this.inTransaction) {
this.flushDeferred = true;
return;
}
if (!this.dirty) return;

this.pendingFlush = (async () => {
const prev = this.flushChain;
const step = (prev ?? Promise.resolve()).then(async () => {
if (!this.dirty || this.destroyed || this.inTransaction) return;
// Snapshot dirty=false before export so a concurrent write re-marks us
// and is picked up by the next queued step.
this.dirty = false;
try {
// Snapshot dirty=false BEFORE export so concurrent writes that occur
// during the async writeFile mark us dirty again and trigger another
// flush via markDirty.
this.dirty = false;
const exported = this.db.export();
// sql.js returns a Uint8Array; Buffer.from on it shares memory but
// works fine for writeFile.
// works fine for the synchronous writeFile enqueue below.
await this.fs!.writeFile(this.filename, Buffer.from(exported));
} catch (err) {
// Restore dirty state so a subsequent flush retries.
this.dirty = true;
this.dirty = true; // let a later flush retry
throw err;
} finally {
this.pendingFlush = null;
}
})();

await this.pendingFlush;

// A write that arrived after `db.export()` (synchronous) but before the
// file write completed will have set dirty=true again. Re-flush to
// persist it.
if (this.dirty && !this.destroyed) {
await this.flush();
}
});
// Keep the chain tail alive but swallow its rejection there so one failed
// flush doesn't poison every future flush; the awaited `step` still throws.
this.flushChain = step.catch(() => {});
await step;
}

/** Close the database, flushing any pending writes first. */
Expand All @@ -229,6 +291,11 @@ export class WasmSqliteConnection {
clearTimeout(this.debounceTimer);
this.debounceTimer = null;
}
// Any transaction still open at close is abandoned and will be rolled back
// by `db.close()`; clear the flag so the final flush is not deferred and
// already-committed data is persisted.
this.rootTxActive = false;
this.savepointDepth = 0;
try {
await this.flush();
} finally {
Expand Down