Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/tx-ambient-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@objectstack/objectql": minor
"@objectstack/rest": minor
---

Robust multi-write transactions (ADR-0034). `engine.transaction()` now establishes an ambient transaction (AsyncLocalStorage) so every data operation during the callback — including internal reads performed while a write runs — binds to the active transaction's connection instead of asking the pool for another one and deadlocking on SQLite's single-connection pool. Adds a cross-object transactional batch endpoint (`POST /api/v1/data/batch`) with intra-batch `{ $ref: <opIndex> }` parent references, so a parent and its children can be created atomically in one transaction.
98 changes: 98 additions & 0 deletions packages/objectql/src/engine-ambient-transaction.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.
//
// Guard for ADR-0034: while a transaction() callback runs, EVERY data op —
// including a write/read given no explicit transaction context — must bind to
// the active transaction (via the ambient AsyncLocalStorage store). Without
// this, internal queries during a write ask the pool for another connection
// and deadlock on the single-connection SQLite pool.

import { describe, it, expect, beforeEach } from 'vitest';
import { ObjectQL } from './engine.js';

function makeRecordingDriver() {
const stores = new Map<string, Map<string, any>>();
const seen: { create: Array<{ object: string; transaction: unknown }>; find: Array<{ object: string; transaction: unknown }> } = {
create: [],
find: [],
};
const storeFor = (o: string) => {
let s = stores.get(o);
if (!s) { s = new Map(); stores.set(o, s); }
return s;
};
let nextId = 0;
const driver: any = {
name: 'memory',
version: '0.0.0',
supports: {},
async connect() {},
async disconnect() {},
async checkHealth() { return true; },
async execute() { return null; },
async find(object: string, _ast: any, options: any) {
seen.find.push({ object, transaction: options?.transaction });
return Array.from(storeFor(object).values());
},
findStream() { throw new Error('not implemented'); },
async findOne(object: string) {
for (const r of storeFor(object).values()) return r;
return null;
},
async create(object: string, data: Record<string, unknown>, options: any) {
seen.create.push({ object, transaction: options?.transaction });
nextId += 1;
const id = (data.id as string) ?? `r_${nextId}`;
const row = { ...data, id };
storeFor(object).set(id, row);
return row;
},
async update(object: string, id: string, data: Record<string, unknown>) {
const s = storeFor(object);
const row = { ...s.get(id), ...data, id };
s.set(id, row);
return row;
},
async delete(object: string, id: string) { return storeFor(object).delete(id); },
async count() { return 0; },
async bulkCreate(object: string, rows: Record<string, unknown>[]) {
return Promise.all(rows.map((r) => this.create(object, r, undefined)));
},
async bulkUpdate() { return []; },
async bulkDelete() {},
async beginTransaction() { return { __trx: true, commit: async () => {}, rollback: async () => {} }; },
async commit() {},
async rollback() {},
};
return { driver, seen };
}

describe('engine ambient transaction (ADR-0034)', () => {
let engine: ObjectQL;
let seen: ReturnType<typeof makeRecordingDriver>['seen'];

beforeEach(async () => {
engine = new ObjectQL();
const d = makeRecordingDriver();
seen = d.seen;
engine.registerDriver(d.driver, true);
await engine.init();
engine.registry.registerObject({ name: 'thing', fields: { name: { type: 'text' } } } as any);
});

it('threads the active transaction into writes given NO explicit context', async () => {
await engine.transaction(async () => {
// No context passed — must inherit the transaction ambiently.
await engine.insert('thing', { name: 'A' });
await engine.insert('thing', { name: 'B' });
});
expect(seen.create.length).toBe(2);
expect(seen.create[0].transaction).toBeTruthy();
// both writes ran on the SAME transaction (no second connection)
expect(seen.create[1].transaction).toBe(seen.create[0].transaction);
});

it('does not leak a transaction to ops outside the transaction() scope', async () => {
await engine.insert('thing', { name: 'outside' });
expect(seen.create.at(-1)!.transaction).toBeUndefined();
});
});
34 changes: 30 additions & 4 deletions packages/objectql/src/engine.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { AsyncLocalStorage } from 'node:async_hooks';
import { QueryAST, HookContext, ServiceObject } from '@objectstack/spec/data';
import {
EngineQueryOptions,
Expand Down Expand Up @@ -192,6 +193,16 @@ function resolveMetadataItemName(key: string, item: any): string | undefined {
* - CoreServiceName.metadata (Schema Registry)
*/
export class ObjectQL implements IDataEngine {
/**
* Ambient transaction store (ADR-0034). While a `transaction()` callback
* runs, the active transaction handle lives here so that EVERY data
* operation — including internal reads done during a write (reference
* checks, hooks, expand) — automatically binds to the same connection
* instead of asking the pool for another one and deadlocking on the
* single-connection SQLite pool.
*/
private readonly txStore = new AsyncLocalStorage<{ transaction: unknown }>();

private drivers = new Map<string, DriverInterface>();
private defaultDriver: string | null = null;
private logger: Logger;
Expand Down Expand Up @@ -617,13 +628,19 @@ export class ObjectQL implements IDataEngine {
* mask the system path.
*/
private buildDriverOptions(execCtx?: ExecutionContext, base?: any): any {
const hasTx = execCtx?.transaction !== undefined;
// The open transaction may arrive explicitly via the context, or ambiently
// via txStore when an internal query runs during a transactional write
// (ADR-0034). Explicit wins; ambient is the safety net.
const tx = execCtx?.transaction !== undefined
? execCtx.transaction
: this.txStore.getStore()?.transaction;
const hasTx = tx !== undefined;
const hasTenant = execCtx?.tenantId !== undefined;
const isSystem = execCtx?.isSystem === true;
if (!hasTx && !hasTenant && !isSystem) return base;
const opts: any = base && typeof base === 'object' ? { ...base } : {};
if (hasTx && opts.transaction === undefined) {
opts.transaction = execCtx!.transaction;
opts.transaction = tx;
}
if (hasTenant && opts.tenantId === undefined) {
opts.tenantId = execCtx!.tenantId;
Expand Down Expand Up @@ -2343,7 +2360,9 @@ export class ObjectQL implements IDataEngine {
const trx = await drv.beginTransaction();
const trxCtx = { ...(baseContext ?? {}), transaction: trx };
try {
const result = await callback(trxCtx);
// Run the callback inside the ambient transaction store so internal
// queries during writes reuse this transaction's connection (ADR-0034).
const result = await this.txStore.run({ transaction: trx }, () => callback(trxCtx));
if (drv.commit) await drv.commit(trx);
else if (drv.commitTransaction) await drv.commitTransaction(trx);
return result;
Expand Down Expand Up @@ -2803,9 +2822,16 @@ export class ScopedContext {
{ ...this.executionContext, transaction: trx },
this.engine
);
// Share the engine's ambient transaction store so internal queries during
// writes reuse this transaction's connection (ADR-0034).
const txStore = (this.engine as any)?.txStore as
| { run<R>(s: { transaction: unknown }, fn: () => R): R }
| undefined;
const runIn = <R>(fn: () => Promise<R>): Promise<R> =>
txStore ? txStore.run({ transaction: trx }, fn) : fn();

try {
const result = await callback(trxCtx);
const result = await runIn(() => callback(trxCtx));
if (driver.commit) await driver.commit(trx);
else if (driver.commitTransaction) await driver.commitTransaction(trx);
return result;
Expand Down
60 changes: 60 additions & 0 deletions packages/plugins/driver-sql/src/sql-driver-multiwrite-tx.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.
//
// Regression guard for the multi-write transaction deadlock (ADR-0034 / #1604):
// two successful writes inside one transaction must commit, not hang.

import { describe, it, expect, afterEach } from 'vitest';
import os from 'node:os';
import path from 'node:path';
import fs from 'node:fs';
import { SqlDriver } from '../src/index.js';

function withTimeout<T>(p: Promise<T>, ms: number, label: string): Promise<T> {
return Promise.race([
p,
new Promise<T>((_, rej) => setTimeout(() => rej(new Error(`TIMEOUT: ${label} (deadlock)`)), ms)),
]);
}

describe('SqlDriver multi-write transaction (deadlock regression)', () => {
let driver: SqlDriver | undefined;
let file: string | undefined;

afterEach(async () => {
if (driver) await (driver as any).knex.destroy();
if (file && fs.existsSync(file)) fs.unlinkSync(file);
driver = undefined;
file = undefined;
});

async function setup() {
file = path.join(os.tmpdir(), `os-txtest-${Date.now()}-${Math.random().toString(36).slice(2)}.db`);
driver = new SqlDriver({ client: 'better-sqlite3', connection: { filename: file }, useNullAsDefault: true } as any);
const k = (driver as any).knex;
await k.schema.createTable('t', (t: any) => {
t.string('id').primary();
t.string('name');
});
return k;
}

it('commits TWO writes in one transaction without hanging', async () => {
const k = await setup();
const trx = await driver!.beginTransaction();
await withTimeout(driver!.create('t', { id: '1', name: 'A' }, { transaction: trx } as any), 6000, 'create #1');
await withTimeout(driver!.create('t', { id: '2', name: 'B' }, { transaction: trx } as any), 6000, 'create #2');
await withTimeout(driver!.commit(trx), 6000, 'commit');
const rows = await k('t').select();
expect(rows.map((r: any) => r.id).sort()).toEqual(['1', '2']);
});

it('rolls back all writes when the transaction is aborted', async () => {
const k = await setup();
const trx = await driver!.beginTransaction();
await driver!.create('t', { id: '1', name: 'A' }, { transaction: trx } as any);
await driver!.create('t', { id: '2', name: 'B' }, { transaction: trx } as any);
await driver!.rollback(trx);
const rows = await k('t').select();
expect(rows.length).toBe(0);
});
});
70 changes: 70 additions & 0 deletions packages/rest/src/rest-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4187,6 +4187,76 @@ export class RestServer {

const operations = batch.operations;

// POST /batch — cross-object transactional batch (issue #1604).
// Runs heterogeneous create/update/delete across objects in ONE engine
// transaction (commit all or roll back all). Intra-batch references:
// a field value of `{ $ref: <earlier op index> }` resolves to that op's
// created id, so a child can reference its parent (master-detail).
this.routeManager.register({
method: 'POST',
path: `${basePath}/batch`,
handler: async (req: any, res: any) => {
try {
const environmentId = isScoped ? req.params?.environmentId : undefined;
const context = await this.resolveExecCtx(environmentId, req);
if (this.enforceAuth(req, res, context)) return;
const ql = this.objectQLProvider ? await this.objectQLProvider(environmentId) : undefined;
if (!ql || typeof ql.transaction !== 'function') {
res.status(501).json({ error: 'Transactional batch not supported by this runtime' });
return;
}
const ops: any[] = Array.isArray(req.body?.operations) ? req.body.operations : [];
const max = batch.maxBatchSize ?? 200;
if (ops.length === 0) { res.json({ results: [] }); return; }
if (ops.length > max) { res.status(400).json({ error: `Batch too large (max ${max})` }); return; }

const resolveRefs = (data: any, out: any[]): any => {
if (!data || typeof data !== 'object') return data;
const result: any = Array.isArray(data) ? [] : {};
for (const [k, v] of Object.entries(data)) {
if (v && typeof v === 'object' && '$ref' in (v as any)) {
const ref = out[(v as any).$ref];
result[k] = (ref && (ref.id ?? ref._id)) ?? null;
} else {
result[k] = v;
}
}
return result;
};

const results = await ql.transaction(async (trxCtx: any) => {
const out: any[] = [];
for (const op of ops) {
const action = String(op?.action || 'create');
const object = String(op?.object || '');
if (!object) throw new Error('Each operation requires an `object`');
const data = resolveRefs(op.data, out);
if (action === 'create') {
out.push(await ql.insert(object, data, { context: trxCtx }));
} else if (action === 'update') {
const id = op.id ?? data?.id;
out.push(await ql.update(object, { ...data, id }, { context: trxCtx }));
} else if (action === 'delete') {
out.push(await ql.delete(object, { where: { id: op.id }, context: trxCtx }));
} else {
throw new Error(`Unknown batch action: ${action}`);
}
}
return out;
}, context);

res.json({ results });
} catch (error: any) {
logError('[REST] Unhandled error:', error);
sendError(res, error);
}
},
metadata: {
summary: 'Cross-object transactional batch (atomic create/update/delete across objects)',
tags: ['data', 'batch'],
},
});

// POST /data/:object/batch - Generic batch endpoint
if (batch.enableBatchEndpoint && this.protocol.batchData) {
this.routeManager.register({
Expand Down
Loading