From a41deb15f20e356f20286097ae4a83c04ac058d9 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 25 Feb 2026 11:39:41 +0200 Subject: [PATCH 01/12] On-demand sync - client side implementation. --- packages/powersync-db-collection/package.json | 1 + packages/powersync-db-collection/src/index.ts | 1 + .../powersync-db-collection/src/powersync.ts | 296 +++- .../src/sqlite-compiler.ts | 354 ++++ .../tests/on-demand-sync.test.ts | 1499 +++++++++++++++++ .../tests/sqlite-compiler.test.ts | 329 ++++ 6 files changed, 2428 insertions(+), 52 deletions(-) create mode 100644 packages/powersync-db-collection/src/sqlite-compiler.ts create mode 100644 packages/powersync-db-collection/tests/on-demand-sync.test.ts create mode 100644 packages/powersync-db-collection/tests/sqlite-compiler.test.ts diff --git a/packages/powersync-db-collection/package.json b/packages/powersync-db-collection/package.json index 587b3298c..0f763b3cf 100644 --- a/packages/powersync-db-collection/package.json +++ b/packages/powersync-db-collection/package.json @@ -55,6 +55,7 @@ "@standard-schema/spec": "^1.1.0", "@tanstack/db": "workspace:*", "@tanstack/store": "^0.8.0", + "async-mutex": "^0.5.0", "debug": "^4.4.3", "p-defer": "^4.0.1" }, diff --git a/packages/powersync-db-collection/src/index.ts b/packages/powersync-db-collection/src/index.ts index 6879d7a22..f8d092805 100644 --- a/packages/powersync-db-collection/src/index.ts +++ b/packages/powersync-db-collection/src/index.ts @@ -1,3 +1,4 @@ export * from './definitions' export * from './powersync' export * from './PowerSyncTransactor' +export * from './sqlite-compiler' diff --git a/packages/powersync-db-collection/src/powersync.ts b/packages/powersync-db-collection/src/powersync.ts index bc2e85bc2..a35b0d777 100644 --- a/packages/powersync-db-collection/src/powersync.ts +++ b/packages/powersync-db-collection/src/powersync.ts @@ -1,10 +1,14 @@ import { DiffTriggerOperation, sanitizeSQL } from '@powersync/common' +import { Mutex } from 'async-mutex' +import { or } from '@tanstack/db' +import { compileSQLite } from './sqlite-compiler' import { PendingOperationStore } from './PendingOperationStore' import { PowerSyncTransactor } from './PowerSyncTransactor' import { DEFAULT_BATCH_SIZE } from './definitions' import { asPowerSyncRecord, mapOperation } from './helpers' import { convertTableToSchema } from './schema' import { serializeForSQLite } from './serialization' +import type { LoadSubsetOptions, OperationType, SyncConfig } from '@tanstack/db' import type { AnyTableColumnType, ExtractedTable, @@ -24,9 +28,8 @@ import type { PowerSyncCollectionUtils, } from './definitions' import type { PendingOperation } from './PendingOperationStore' -import type { SyncConfig } from '@tanstack/db' import type { StandardSchemaV1 } from '@standard-schema/spec' -import type { Table, TriggerDiffRecord } from '@powersync/common' +import type { LockContext, Table, TriggerDiffRecord } from '@powersync/common' /** * Creates PowerSync collection options for use with a standard Collection. @@ -225,6 +228,7 @@ export function powerSyncCollectionOptions< table, schema: inputSchema, syncBatchSize = DEFAULT_BATCH_SIZE, + syncMode = 'eager', ...restConfig } = config @@ -296,11 +300,66 @@ export function powerSyncCollectionOptions< */ const sync: SyncConfig = { sync: (params) => { - const { begin, write, commit, markReady } = params + const { begin, write, collection, commit, markReady } = params const abortController = new AbortController() - // The sync function needs to be synchronous - async function start() { + let disposeTracking: (() => Promise) | null = null + + if (syncMode === `eager`) { + return runEagerSync() + } else { + return runOnDemandSync() + } + + async function createDiffTrigger(options: { + when: Record + writeType: (rowId: string) => OperationType + batchQuery: ( + lockContext: LockContext, + batchSize: number, + cursor: number, + ) => Promise> + onReady: () => void + }) { + const { when, writeType, batchQuery, onReady } = options + + return await database.triggers.createDiffTrigger({ + source: viewName, + destination: trackedTableName, + when, + hooks: { + beforeCreate: async (context) => { + let currentBatchCount = syncBatchSize + let cursor = 0 + while (currentBatchCount == syncBatchSize) { + begin() + + const batchItems = await batchQuery( + context, + syncBatchSize, + cursor, + ) + currentBatchCount = batchItems.length + cursor += currentBatchCount + for (const row of batchItems) { + write({ + type: writeType(row.id), + value: deserializeSyncRow(row), + }) + } + commit() + } + onReady() + database.logger.info( + `Sync is ready for ${viewName} into ${trackedTableName}`, + ) + }, + }, + }) + } + + // The sync function needs to be synchronous. + async function start(afterOnChangeRegistered?: () => Promise) { database.logger.info( `Sync is starting for ${viewName} into ${trackedTableName}`, ) @@ -362,68 +421,200 @@ export function powerSyncCollectionOptions< }, ) - const disposeTracking = await database.triggers.createDiffTrigger({ - source: viewName, - destination: trackedTableName, - when: { - [DiffTriggerOperation.INSERT]: `TRUE`, - [DiffTriggerOperation.UPDATE]: `TRUE`, - [DiffTriggerOperation.DELETE]: `TRUE`, - }, - hooks: { - beforeCreate: async (context) => { - let currentBatchCount = syncBatchSize - let cursor = 0 - while (currentBatchCount == syncBatchSize) { - begin() - const batchItems = await context.getAll( - sanitizeSQL`SELECT * FROM ${viewName} LIMIT ? OFFSET ?`, - [syncBatchSize, cursor], - ) - currentBatchCount = batchItems.length - cursor += currentBatchCount - for (const row of batchItems) { - write({ - type: `insert`, - value: deserializeSyncRow(row), - }) - } - commit() - } - markReady() - database.logger.info( - `Sync is ready for ${viewName} into ${trackedTableName}`, - ) - }, - }, - }) + await afterOnChangeRegistered?.() // If the abort controller was aborted while processing the request above if (abortController.signal.aborted) { - await disposeTracking() + await disposeTracking?.() } else { abortController.signal.addEventListener( `abort`, () => { - disposeTracking() + disposeTracking?.() }, { once: true }, ) } } - start().catch((error) => - database.logger.error( - `Could not start syncing process for ${viewName} into ${trackedTableName}`, - error, - ), - ) + // Eager mode. + // Registers a diff trigger for the entire table. + function runEagerSync() { + start(async () => { + disposeTracking = await createDiffTrigger({ + when: { + [DiffTriggerOperation.INSERT]: `TRUE`, + [DiffTriggerOperation.UPDATE]: `TRUE`, + [DiffTriggerOperation.DELETE]: `TRUE`, + }, + writeType: (_rowId: string) => `insert`, + batchQuery: ( + lockContext: LockContext, + batchSize: number, + cursor: number, + ) => + lockContext.getAll( + sanitizeSQL`SELECT * FROM ${viewName} LIMIT ? OFFSET ?`, + [batchSize, cursor], + ), + onReady: () => markReady(), + }) + }).catch((error) => + database.logger.error( + `Could not start syncing process for ${viewName} into ${trackedTableName}`, + error, + ), + ) + + return () => { + database.logger.info( + `Sync has been stopped for ${viewName} into ${trackedTableName}`, + ) + abortController.abort() + } + } - return () => { - database.logger.info( - `Sync has been stopped for ${viewName} into ${trackedTableName}`, + // On-demand mode. + // Registers a diff trigger for the active WHERE expressions. + function runOnDemandSync() { + start().catch((error) => + database.logger.error( + `Could not start syncing process for ${viewName} into ${trackedTableName}`, + error, + ), ) - abortController.abort() + + // Tracks all active WHERE expressions for on-demand sync filtering. + // Each loadSubset call pushes its predicate; unloadSubset removes it. + const activeWhereExpressions: Array = [] + const mutex = new Mutex() + + const loadSubset = async (options?: LoadSubsetOptions): Promise => { + if (options) { + activeWhereExpressions.push(options.where) + } + + if (activeWhereExpressions.length === 0) { + await disposeTracking?.() + return + } + + const combinedWhere = + activeWhereExpressions.length === 1 + ? activeWhereExpressions[0] + : or( + activeWhereExpressions[0]!, + activeWhereExpressions[1]!, + ...activeWhereExpressions.slice(2), + ) + + const compiledNewData = compileSQLite( + { where: combinedWhere }, + { jsonColumn: 'NEW.data' }, + ) + + const compiledOldData = compileSQLite( + { where: combinedWhere }, + { jsonColumn: 'OLD.data' }, + ) + + const compiledView = compileSQLite({ where: combinedWhere }) + + const newDataWhenClause = toInlinedWhereClause(compiledNewData) + const oldDataWhenClause = toInlinedWhereClause(compiledOldData) + const viewWhereClause = toInlinedWhereClause(compiledView) + + await disposeTracking?.() + + disposeTracking = await createDiffTrigger({ + when: { + [DiffTriggerOperation.INSERT]: newDataWhenClause, + [DiffTriggerOperation.UPDATE]: `(${newDataWhenClause}) OR (${oldDataWhenClause})`, + [DiffTriggerOperation.DELETE]: oldDataWhenClause, + }, + writeType: (rowId: string) => + collection.has(rowId) ? `update` : `insert`, + batchQuery: ( + lockContext: LockContext, + batchSize: number, + cursor: number, + ) => + lockContext.getAll( + `SELECT * FROM ${viewName} WHERE ${viewWhereClause} LIMIT ? OFFSET ?`, + [batchSize, cursor], + ), + onReady: () => {}, + }) + } + + const toInlinedWhereClause = (compiled: { + where?: string + params: Array + }): string => { + if (!compiled.where) return 'TRUE' + const sqlParts = compiled.where.split('?') + return sanitizeSQL( + sqlParts as unknown as TemplateStringsArray, + ...compiled.params, + ) + } + + const unloadSubset = async (options: LoadSubsetOptions) => { + const idx = activeWhereExpressions.indexOf(options.where) + if (idx !== -1) { + activeWhereExpressions.splice(idx, 1) + } + + // Evict rows that were exclusively loaded by the departing predicate. + // These are rows matching the departing WHERE that are no longer covered + // by any remaining active predicate. + const compiledDeparting = compileSQLite({ where: options.where }) + const departingWhereSQL = toInlinedWhereClause(compiledDeparting) + + let evictionSQL: string + if (activeWhereExpressions.length === 0) { + evictionSQL = `SELECT id FROM ${viewName} WHERE ${departingWhereSQL}` + } else { + const combinedRemaining = + activeWhereExpressions.length === 1 + ? activeWhereExpressions[0]! + : or( + activeWhereExpressions[0]!, + activeWhereExpressions[1]!, + ...activeWhereExpressions.slice(2), + ) + const compiledRemaining = compileSQLite({ where: combinedRemaining }) + const remainingWhereSQL = toInlinedWhereClause(compiledRemaining) + evictionSQL = `SELECT id FROM ${viewName} WHERE (${departingWhereSQL}) AND NOT (${remainingWhereSQL})` + } + + const rowsToEvict = await database.getAll<{ id: string }>(evictionSQL) + if (rowsToEvict.length > 0) { + begin() + for (const { id } of rowsToEvict) { + write({ type: `delete`, key: id }) + } + commit() + } + + // Recreate the diff trigger for the remaining active WHERE expressions. + await loadSubset() + } + + markReady() + + return { + cleanup: () => { + database.logger.info( + `Sync has been stopped for ${viewName} into ${trackedTableName}`, + ) + abortController.abort() + }, + loadSubset: (options: LoadSubsetOptions) => + mutex.runExclusive(() => loadSubset(options)), + unloadSubset: (options: LoadSubsetOptions) => + mutex.runExclusive(() => unloadSubset(options)), + } } }, // Expose the getSyncMetadata function @@ -442,6 +633,7 @@ export function powerSyncCollectionOptions< getKey, // Syncing should start immediately since we need to monitor the changes for mutations startSync: true, + syncMode, sync, onInsert: async (params) => { // The transaction here should only ever contain a single insert mutation diff --git a/packages/powersync-db-collection/src/sqlite-compiler.ts b/packages/powersync-db-collection/src/sqlite-compiler.ts new file mode 100644 index 000000000..e2df875fc --- /dev/null +++ b/packages/powersync-db-collection/src/sqlite-compiler.ts @@ -0,0 +1,354 @@ +import type { IR, LoadSubsetOptions } from '@tanstack/db' + +/** + * Result of compiling LoadSubsetOptions to SQLite + */ +export interface SQLiteCompiledQuery { + /** The WHERE clause (without "WHERE" keyword), e.g., "price > ?" */ + where?: string + /** The ORDER BY clause (without "ORDER BY" keyword), e.g., "price DESC" */ + orderBy?: string + /** The LIMIT value */ + limit?: number + /** Parameter values in order, to be passed to SQLite query */ + params: Array +} + +/** + * Options for controlling how SQL is compiled. + */ +export interface CompileSQLiteOptions { + /** + * When set, column references emit `json_extract(, '$.')` + * instead of `""`. The `id` column is excluded since it's stored + * as a direct column in the tracked table. + */ + jsonColumn?: string +} + +/** + * Compiles TanStack DB LoadSubsetOptions to SQLite query components. + * + * @example + * ```typescript + * const compiled = compileSQLite({ + * where: { type: 'func', name: 'gt', args: [ + * { type: 'ref', path: ['price'] }, + * { type: 'val', value: 100 } + * ]}, + * orderBy: [{ expression: { type: 'ref', path: ['price'] }, compareOptions: { direction: 'desc', nulls: 'last' } }], + * limit: 50 + * }) + * // Result: { where: '"price" > ?', orderBy: '"price" DESC', limit: 50, params: [100] } + * ``` + */ +export function compileSQLite( + options: LoadSubsetOptions, + compileOptions?: CompileSQLiteOptions, +): SQLiteCompiledQuery { + const { where, orderBy, limit } = options + + const params: Array = [] + const result: SQLiteCompiledQuery = { params } + + if (where) { + result.where = compileExpression(where, params, compileOptions) + } + + if (orderBy) { + result.orderBy = compileOrderBy(orderBy, params, compileOptions) + } + + if (limit !== undefined) { + result.limit = limit + } + + return result +} + +/** + * Quote SQLite identifiers to handle column names correctly. + * SQLite uses double quotes for identifiers. + */ +function quoteIdentifier(name: string): string { + // Escape any double quotes in the name by doubling them + const escaped = name.replace(/"/g, `""`) + return `"${escaped}"` +} + +/** + * Compiles a BasicExpression to a SQL string, mutating the params array. + */ +function compileExpression( + exp: IR.BasicExpression, + params: Array, + compileOptions?: CompileSQLiteOptions, +): string { + switch (exp.type) { + case `val`: + params.push(exp.value) + return `?` + case `ref`: { + if (exp.path.length !== 1) { + throw new Error( + `SQLite compiler doesn't support nested properties: ${exp.path.join(`.`)}`, + ) + } + const columnName = exp.path[0]! + if (compileOptions?.jsonColumn && columnName !== `id`) { + return `json_extract(${compileOptions.jsonColumn}, '$.${columnName}')` + } + return quoteIdentifier(columnName) + } + case `func`: + return compileFunction(exp, params, compileOptions) + default: + throw new Error(`Unknown expression type: ${(exp as any).type}`) + } +} + +/** + * Compiles an OrderBy array to a SQL ORDER BY clause. + */ +function compileOrderBy( + orderBy: IR.OrderBy, + params: Array, + compileOptions?: CompileSQLiteOptions, +): string { + const clauses = orderBy.map((clause: IR.OrderByClause) => + compileOrderByClause(clause, params, compileOptions), + ) + return clauses.join(`, `) +} + +/** + * Compiles a single OrderByClause to SQL. + */ +function compileOrderByClause( + clause: IR.OrderByClause, + params: Array, + compileOptions?: CompileSQLiteOptions, +): string { + const { expression, compareOptions } = clause + let sql = compileExpression(expression, params, compileOptions) + + if (compareOptions.direction === `desc`) { + sql = `${sql} DESC` + } + + // SQLite supports NULLS FIRST/LAST (since 3.30.0) + if (compareOptions.nulls === `first`) { + sql = `${sql} NULLS FIRST` + } else { + // Default to NULLS LAST (nulls === 'last') + sql = `${sql} NULLS LAST` + } + + return sql +} + +/** + * Check if a BasicExpression represents a null/undefined value + */ +function isNullValue(exp: IR.BasicExpression): boolean { + return exp.type === `val` && (exp.value === null || exp.value === undefined) +} + +/** + * Compiles a function expression (operator) to SQL. + */ +function compileFunction( + exp: IR.Func, + params: Array, + compileOptions?: CompileSQLiteOptions, +): string { + const { name, args } = exp + + // Check for null values in comparison operators + if (isComparisonOp(name)) { + const hasNullArg = args.some((arg: IR.BasicExpression) => isNullValue(arg)) + if (hasNullArg) { + throw new Error( + `Cannot use null/undefined with '${name}' operator. ` + + `Use isNull() to check for null values.`, + ) + } + } + + // Compile arguments + const compiledArgs = args.map((arg: IR.BasicExpression) => + compileExpression(arg, params, compileOptions), + ) + + // Handle different operator types + switch (name) { + // Binary comparison operators + case `eq`: + case `gt`: + case `gte`: + case `lt`: + case `lte`: { + if (compiledArgs.length !== 2) { + throw new Error(`${name} expects 2 arguments`) + } + const opSymbol = getComparisonOp(name) + return `${compiledArgs[0]} ${opSymbol} ${compiledArgs[1]}` + } + + // Logical operators + case `and`: + case `or`: { + if (compiledArgs.length < 2) { + throw new Error(`${name} expects at least 2 arguments`) + } + const opKeyword = name === `and` ? `AND` : `OR` + return compiledArgs + .map((arg: string) => `(${arg})`) + .join(` ${opKeyword} `) + } + + case `not`: { + if (compiledArgs.length !== 1) { + throw new Error(`not expects 1 argument`) + } + // Check if argument is isNull/isUndefined for IS NOT NULL + const arg = args[0] + if (arg && arg.type === `func`) { + if (arg.name === `isNull` || arg.name === `isUndefined`) { + const innerArg = compileExpression( + arg.args[0]!, + params, + compileOptions, + ) + return `${innerArg} IS NOT NULL` + } + } + return `NOT (${compiledArgs[0]})` + } + + // Null checking + case `isNull`: + case `isUndefined`: { + if (compiledArgs.length !== 1) { + throw new Error(`${name} expects 1 argument`) + } + return `${compiledArgs[0]} IS NULL` + } + + // IN operator + case `in`: { + if (compiledArgs.length !== 2) { + throw new Error(`in expects 2 arguments (column and array)`) + } + // The second argument should be an array value + // We need to handle this specially - expand the array into multiple placeholders + const lastParamIndex = params.length - 1 + const arrayValue = params[lastParamIndex] + + if (!Array.isArray(arrayValue)) { + throw new Error(`in operator requires an array value`) + } + + // Remove the array param and add individual values + params.pop() + const placeholders = arrayValue.map(() => { + params.push(arrayValue[params.length - lastParamIndex]) + return `?` + }) + + // Re-add individual values properly + params.length = lastParamIndex // Reset to before array + for (const val of arrayValue) { + params.push(val) + } + + return `${compiledArgs[0]} IN (${placeholders.join(`, `)})` + } + + // String operators + case `like`: { + if (compiledArgs.length !== 2) { + throw new Error(`like expects 2 arguments`) + } + return `${compiledArgs[0]} LIKE ${compiledArgs[1]}` + } + + case `ilike`: { + if (compiledArgs.length !== 2) { + throw new Error(`ilike expects 2 arguments`) + } + return `${compiledArgs[0]} LIKE ${compiledArgs[1]} COLLATE NOCASE` + } + + // String case functions + case `upper`: { + if (compiledArgs.length !== 1) { + throw new Error(`upper expects 1 argument`) + } + return `UPPER(${compiledArgs[0]})` + } + + case `lower`: { + if (compiledArgs.length !== 1) { + throw new Error(`lower expects 1 argument`) + } + return `LOWER(${compiledArgs[0]})` + } + + case `length`: { + if (compiledArgs.length !== 1) { + throw new Error(`length expects 1 argument`) + } + return `LENGTH(${compiledArgs[0]})` + } + + case `concat`: { + if (compiledArgs.length < 1) { + throw new Error(`concat expects at least 1 argument`) + } + return `CONCAT(${compiledArgs.join(`, `)})` + } + + case `add`: { + if (compiledArgs.length !== 2) { + throw new Error(`add expects 2 arguments`) + } + return `${compiledArgs[0]} + ${compiledArgs[1]}` + } + + // Null fallback + case `coalesce`: { + if (compiledArgs.length < 1) { + throw new Error(`coalesce expects at least 1 argument`) + } + return `COALESCE(${compiledArgs.join(`, `)})` + } + + default: + throw new Error( + `Operator '${name}' is not supported in PowerSync on-demand sync. ` + + `Supported operators: eq, gt, gte, lt, lte, and, or, not, isNull, in, like, ilike, upper, lower, length, concat, add, coalesce`, + ) + } +} + +/** + * Check if operator is a comparison operator + */ +function isComparisonOp(name: string): boolean { + return [`eq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`].includes(name) +} + +/** + * Get the SQL symbol for a comparison operator + */ +function getComparisonOp(name: string): string { + const ops: Record = { + eq: `=`, + gt: `>`, + gte: `>=`, + lt: `<`, + lte: `<=`, + } + return ops[name]! +} diff --git a/packages/powersync-db-collection/tests/on-demand-sync.test.ts b/packages/powersync-db-collection/tests/on-demand-sync.test.ts new file mode 100644 index 000000000..a23210471 --- /dev/null +++ b/packages/powersync-db-collection/tests/on-demand-sync.test.ts @@ -0,0 +1,1499 @@ +import { randomUUID } from 'node:crypto' +import { tmpdir } from 'node:os' +import { PowerSyncDatabase, Schema, Table, column } from '@powersync/node' +import { + and, + createCollection, + createLiveQueryCollection, + eq, + gt, + gte, + lt, + or, +} from '@tanstack/db' +import { describe, expect, it, onTestFinished, vi } from 'vitest' +import { powerSyncCollectionOptions } from '../src' + +const APP_SCHEMA = new Schema({ + products: new Table({ + name: column.text, + price: column.integer, + category: column.text, + }), +}) + +describe(`On-Demand Sync Mode`, () => { + async function createDatabase() { + const db = new PowerSyncDatabase({ + database: { + dbFilename: `test-on-demand-${randomUUID()}.sqlite`, + dbLocation: tmpdir(), + implementation: { type: `node:sqlite` }, + }, + schema: APP_SCHEMA, + }) + onTestFinished(async () => { + // Wait a moment for any pending cleanup operations to complete + // before closing the database to prevent "operation on closed remote" errors + await new Promise((resolve) => setTimeout(resolve, 100)) + await db.disconnectAndClear() + await db.close() + }) + await db.disconnectAndClear() + return db + } + + async function createTestProducts(db: PowerSyncDatabase) { + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES + (uuid(), 'Product A', 50, 'electronics'), + (uuid(), 'Product B', 150, 'electronics'), + (uuid(), 'Product C', 25, 'clothing'), + (uuid(), 'Product D', 200, 'electronics'), + (uuid(), 'Product E', 75, 'clothing') + `) + } + + it(`should not load any data initially in on-demand mode`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Verify data exists in SQLite + const sqliteCount = await db.get<{ count: number }>( + `SELECT COUNT(*) as count FROM products`, + ) + expect(sqliteCount.count).toBe(5) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + // Wait for collection to be ready + await collection.stateWhenReady() + + // Verify NO data was loaded into the collection + expect(collection.size).toBe(0) + }) + + it(`should load only matching data when live query is created`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // Verify collection is empty initially + expect(collection.size).toBe(0) + + // Create a live query that filters for electronics over $100 + const expensiveElectronics = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => expensiveElectronics.cleanup()) + + // Preload triggers the live query to request data via loadSubset + await expensiveElectronics.preload() + + // Wait for loadSubset to complete and data to appear + await vi.waitFor( + () => { + // The live query should have triggered loadSubset + // Only electronics with price > 100 should match: Product B (150), Product D (200) + expect(expensiveElectronics.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Verify the correct products were loaded + const loadedProducts = expensiveElectronics.toArray + const names = loadedProducts.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`]) + + // Verify prices are correct + const prices = loadedProducts.map((p) => p.price).sort((a, b) => a! - b!) + expect(prices).toEqual([150, 200]) + }) + + it(`should reactively update live query when new matching data is inserted into SQLite`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // Create a live query that filters for electronics over $100 + const expensiveElectronics = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveElectronics.cleanup()) + + // Preload triggers the live query to request data via loadSubset + await expensiveElectronics.preload() + + // Wait for initial data to load + await vi.waitFor( + () => { + expect(expensiveElectronics.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Verify initial products + let names = expensiveElectronics.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`]) + + // Now insert a new matching product directly into SQLite + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Product F', 300, 'electronics') + `) + + // Wait for the diff trigger to propagate the change to the live query + await vi.waitFor( + () => { + // Should now have 3 products: B, D, and F + expect(expensiveElectronics.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Verify all products including the new one + names = expensiveElectronics.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`, `Product F`]) + + // Verify the new product's price + const productF = expensiveElectronics.toArray.find( + (p) => p.name === `Product F`, + ) + expect(productF?.price).toBe(300) + }) + + it(`should not include non-matching data inserted into SQLite`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // Create a live query that filters for electronics over $100 + const expensiveElectronics = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => expensiveElectronics.cleanup()) + + // Preload triggers the live query to request data via loadSubset + await expensiveElectronics.preload() + + // Wait for initial data to load + await vi.waitFor( + () => { + expect(expensiveElectronics.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Verify initial products + const initialNames = expensiveElectronics.toArray.map((p) => p.name).sort() + expect(initialNames).toEqual([`Product B`, `Product D`]) + + // Insert a non-matching product: electronics but too cheap + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Cheap Electronics', 50, 'electronics') + `) + + // Insert another non-matching product: expensive but wrong category + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Expensive Clothing', 500, 'clothing') + `) + + // Wait a bit to allow any potential (incorrect) updates to propagate + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Verify the live query still has only the original 2 products + expect(expensiveElectronics.size).toBe(2) + + // Verify the names haven't changed + const finalNames = expensiveElectronics.toArray.map((p) => p.name).sort() + expect(finalNames).toEqual([`Product B`, `Product D`]) + + // Verify the base collection only contains items matching active predicates + // Non-matching diff trigger items are filtered out in on-demand mode + expect(collection.size).toBe(2) // Only the 2 matching items from loadSubset + }) + + it(`should handle multiple live queries without losing predicate coverage`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // LQ1: electronics category + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + // Products A(50), B(150), D(200) are electronics + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // LQ2: price > 100 (different predicate on same collection) + const expensiveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveQuery.cleanup()) + + await expensiveQuery.preload() + + await vi.waitFor( + () => { + // Products B(150) and D(200) have price > 100 + expect(expensiveQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Now insert a new product that matches LQ1 (electronics) but NOT LQ2 (price <= 100) + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Cheap Gadget', 30, 'electronics') + `) + + // The diff trigger should use the OR of both active predicates: + // (category = 'electronics') OR (price > 100) + // 'Cheap Gadget' (electronics, price=30) matches the first predicate, + // so it should reach the base collection and appear in electronicsQuery. + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(4) // 3 original + Cheap Gadget + }, + { timeout: 2000 }, + ) + }) + + it(`should handle three live queries with combined predicate coverage`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // LQ1: electronics category + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + // Products A(50), B(150), D(200) are electronics + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // LQ2: price > 100 + const expensiveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveQuery.cleanup()) + + await expensiveQuery.preload() + + await vi.waitFor( + () => { + // Products B(150) and D(200) have price > 100 + expect(expensiveQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // LQ3: clothing category — a third predicate to exercise the 3-arg OR path + const clothingQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `clothing`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => clothingQuery.cleanup()) + + await clothingQuery.preload() + + await vi.waitFor( + () => { + // Products C(25) and E(75) are clothing + expect(clothingQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Insert a product that only matches LQ3 (clothing, cheap) + // Diff trigger must OR all three predicates to catch this + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'New Shirt', 40, 'clothing') + `) + + await vi.waitFor( + () => { + expect(clothingQuery.size).toBe(3) // C, E + New Shirt + }, + { timeout: 2000 }, + ) + + // Verify the other queries are unaffected + expect(electronicsQuery.size).toBe(3) + expect(expensiveQuery.size).toBe(2) + }) + + it(`should stop loading data for a predicate after its live query is cleaned up`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // LQ1: electronics category + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // LQ2: clothing category + const clothingQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `clothing`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await clothingQuery.preload() + + await vi.waitFor( + () => { + expect(clothingQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + const electronicsCount = electronicsQuery.size // 3 + + // Kill LQ2 — its predicate should be removed and its rows evicted + clothingQuery.cleanup() + + // Wait for clothing rows to be evicted; collection shrinks to electronics-only + await vi.waitFor( + () => { + expect(collection.size).toBe(electronicsCount) + }, + { timeout: 2000 }, + ) + + // Insert a new clothing item — should NOT be picked up since LQ2 is gone + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'New Shirt', 40, 'clothing') + `) + + // Wait to allow any (incorrect) propagation + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Collection should not have grown — clothing predicate is no longer active + expect(collection.size).toBe(electronicsCount) + + // Insert a new electronics item — should still be picked up by LQ1 + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'New Gadget', 99, 'electronics') + `) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(4) // 3 original + New Gadget + }, + { timeout: 2000 }, + ) + + // Kill LQ1 — no active predicates remain; electronics rows should be evicted + electronicsQuery.cleanup() + + await vi.waitFor( + () => { + expect(collection.size).toBe(0) + }, + { timeout: 2000 }, + ) + + // Insert items matching both former predicates — neither should be picked up + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Another Gadget', 120, 'electronics') + `) + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Another Shirt', 15, 'clothing') + `) + + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Collection should remain empty — no active predicates + expect(collection.size).toBe(0) + }) + + describe(`Basic loadSubset behavior`, () => { + it(`should pass correct WHERE clause from live query filters to loadSubset`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Query using lt — only products with price < 50: Product C (25) + const cheapQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => lt(product.price, 50)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => cheapQuery.cleanup()) + + await cheapQuery.preload() + + await vi.waitFor( + () => { + expect(cheapQuery.size).toBe(1) + }, + { timeout: 2000 }, + ) + + const names = cheapQuery.toArray.map((p) => p.name) + expect(names).toEqual([`Product C`]) + }) + + it(`should pass ORDER BY and LIMIT to loadSubset`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Top 2 most expensive products, ordered by price descending + const top2Query = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .orderBy(({ product }) => product.price, `desc`) + .limit(2) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => top2Query.cleanup()) + + await top2Query.preload() + + await vi.waitFor( + () => { + expect(top2Query.size).toBe(2) + }, + { timeout: 2000 }, + ) + + const prices = top2Query.toArray.map((p) => p.price) + // Product D (200) and Product B (150) are the top 2 + expect(prices).toEqual([200, 150]) + }) + + it(`should handle complex filters (AND, OR) in loadSubset`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Complex filter: (electronics AND price >= 150) OR (clothing AND price < 50) + // Matches: Product B (electronics, 150), Product D (electronics, 200), Product C (clothing, 25) + const complexQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => + or( + and( + eq(product.category, `electronics`), + gte(product.price, 150), + ), + and(eq(product.category, `clothing`), lt(product.price, 50)), + ), + ) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => complexQuery.cleanup()) + + await complexQuery.preload() + + await vi.waitFor( + () => { + expect(complexQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + const names = complexQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product C`, `Product D`]) + }) + + it(`should handle empty result from loadSubset`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Query for a category that doesn't exist — no matching rows + const emptyQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `furniture`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => emptyQuery.cleanup()) + + await emptyQuery.preload() + + // Give it time to process + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(emptyQuery.size).toBe(0) + expect(collection.size).toBe(0) + }) + }) + + describe(`Reactive updates via diff trigger`, () => { + it(`should handle UPDATE to an existing row that still matches the predicate`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + // Products A(50), B(150), D(200) are electronics + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Update Product A's price — still electronics, still matches + const productA = electronicsQuery.toArray.find( + (p) => p.name === `Product A`, + ) + await db.execute(`UPDATE products SET price = 99 WHERE id = ?`, [ + productA!.id, + ]) + + await vi.waitFor( + () => { + const updated = electronicsQuery.toArray.find( + (p) => p.name === `Product A`, + ) + expect(updated?.price).toBe(99) + }, + { timeout: 2000 }, + ) + + // Size unchanged — same row, just updated + expect(electronicsQuery.size).toBe(3) + }) + + it(`should handle UPDATE that causes a row to no longer match the predicate`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Change Product A from electronics to clothing — no longer matches + const productA = electronicsQuery.toArray.find( + (p) => p.name === `Product A`, + ) + await db.execute( + `UPDATE products SET category = 'clothing' WHERE id = ?`, + [productA!.id], + ) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + const names = electronicsQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`]) + }) + + it(`should handle UPDATE that causes a row to start matching the predicate`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + // Products A(50), B(150), D(200) are electronics + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Change Product C from clothing to electronics — now matches + // Product C has id we need to look up from SQLite directly + const productC = await db.get<{ id: string }>( + `SELECT id FROM products WHERE name = 'Product C'`, + ) + await db.execute( + `UPDATE products SET category = 'electronics' WHERE id = ?`, + [productC.id], + ) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(4) + }, + { timeout: 2000 }, + ) + + const names = electronicsQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([ + `Product A`, + `Product B`, + `Product C`, + `Product D`, + ]) + }) + + it(`should handle DELETE of a matching row`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Delete Product A + const productA = electronicsQuery.toArray.find( + (p) => p.name === `Product A`, + ) + await db.execute(`DELETE FROM products WHERE id = ?`, [productA!.id]) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + const names = electronicsQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`]) + }) + }) + + describe(`Unload / cleanup`, () => { + it(`should handle rapid create-and-destroy of live queries without errors`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Rapidly create and destroy 5 live queries + for (let i = 0; i < 5; i++) { + const query = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + query.cleanup() + } + + // Give time for any async cleanup to settle + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Collection should still be functional — create one more and verify it works + const finalQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => finalQuery.cleanup()) + + await finalQuery.preload() + + await vi.waitFor( + () => { + expect(finalQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + }) + + it(`should handle re-creating a live query with the same predicate after cleanup`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Create first query + const query1 = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await query1.preload() + + await vi.waitFor( + () => { + expect(query1.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Destroy it + query1.cleanup() + + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Re-create with same predicate + const query2 = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => query2.cleanup()) + + await query2.preload() + + await vi.waitFor( + () => { + expect(query2.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Verify reactive updates still work on the re-created query + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Product F', 300, 'electronics') + `) + + await vi.waitFor( + () => { + expect(query2.size).toBe(4) + }, + { timeout: 2000 }, + ) + }) + }) + + describe(`Edge cases`, () => { + it(`should handle loadSubset with no WHERE clause (load all data)`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Query with no WHERE — selects all products + const allQuery = createLiveQueryCollection({ + query: (q) => + q.from({ product: collection }).select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => allQuery.cleanup()) + + await allQuery.preload() + + await vi.waitFor( + () => { + expect(allQuery.size).toBe(5) + }, + { timeout: 2000 }, + ) + }) + + it(`should handle empty result from loadSubset (no matching rows in SQLite)`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const emptyQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `furniture`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => emptyQuery.cleanup()) + + await emptyQuery.preload() + + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(emptyQuery.size).toBe(0) + expect(collection.size).toBe(0) + }) + + it(`should handle concurrent loadSubset calls (multiple queries preloading simultaneously)`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Create three queries but don't await preload individually + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + const clothingQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `clothing`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => clothingQuery.cleanup()) + + const expensiveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveQuery.cleanup()) + + // Preload all concurrently + await Promise.all([ + electronicsQuery.preload(), + clothingQuery.preload(), + expensiveQuery.preload(), + ]) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) // A, B, D + expect(clothingQuery.size).toBe(2) // C, E + expect(expensiveQuery.size).toBe(2) // B, D + }, + { timeout: 2000 }, + ) + }) + }) + + describe(`Overlapping data across queries`, () => { + it(`should deduplicate rows when multiple live queries load the same data`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // LQ1: electronics category — matches A(50), B(150), D(200) + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // LQ2: price > 100 — matches B(150), D(200) + // Products B and D overlap with LQ1 + const expensiveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveQuery.cleanup()) + + await expensiveQuery.preload() + + await vi.waitFor( + () => { + expect(expensiveQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Both loadSubset calls inserted rows B and D — base collection should have no duplicates + // Union of both subsets: A, B, D (B and D are shared) + const baseNames = collection.toArray.map((p: any) => p.name).sort() + expect(baseNames).toEqual([`Product A`, `Product B`, `Product D`]) + + // Both live queries return correct results over the shared data + const electronicsNames = electronicsQuery.toArray + .map((p) => p.name) + .sort() + expect(electronicsNames).toEqual([`Product A`, `Product B`, `Product D`]) + + const expensiveNames = expensiveQuery.toArray.map((p) => p.name).sort() + expect(expensiveNames).toEqual([`Product B`, `Product D`]) + + // Update a shared row — both queries should see the change + const productB = expensiveQuery.toArray.find( + (p) => p.name === `Product B`, + ) + await db.execute(`UPDATE products SET price = 175 WHERE id = ?`, [ + productB!.id, + ]) + + await vi.waitFor( + () => { + const inElectronics = electronicsQuery.toArray.find( + (p) => p.name === `Product B`, + ) + const inExpensive = expensiveQuery.toArray.find( + (p) => p.name === `Product B`, + ) + expect(inElectronics?.price).toBe(175) + expect(inExpensive?.price).toBe(175) + }, + { timeout: 2000 }, + ) + }) + + it(`should handle changing a live query's predicate by replacing the collection`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Start with all products (no WHERE) + let liveQuery = createLiveQueryCollection({ + query: (q) => + q.from({ product: collection }).select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await liveQuery.preload() + + await vi.waitFor( + () => { + expect(liveQuery.size).toBe(5) + }, + { timeout: 2000 }, + ) + + // Switch to only electronics + liveQuery.cleanup() + + liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => liveQuery.cleanup()) + + await liveQuery.preload() + + await vi.waitFor( + () => { + expect(liveQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + const names = liveQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product A`, `Product B`, `Product D`]) + + // Verify reactive updates work on the new query + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Product F', 99, 'electronics') + `) + + await vi.waitFor( + () => { + expect(liveQuery.size).toBe(4) + }, + { timeout: 2000 }, + ) + }) + }) +}) diff --git a/packages/powersync-db-collection/tests/sqlite-compiler.test.ts b/packages/powersync-db-collection/tests/sqlite-compiler.test.ts new file mode 100644 index 000000000..59c7d5d81 --- /dev/null +++ b/packages/powersync-db-collection/tests/sqlite-compiler.test.ts @@ -0,0 +1,329 @@ +import { describe, expect, it } from 'vitest' +import { IR } from '@tanstack/db' +import { compileSQLite } from '../src/sqlite-compiler' + +const val = (value: T) => new IR.Value(value) +// Helper to create expression nodes +const ref = (path: Array) => new IR.PropRef(path) +const func = (name: string, args: Array) => + new IR.Func(name, args) + +describe(`SQLite Compiler`, () => { + describe(`where clause compilation`, () => { + it(`should compile eq operator`, () => { + const result = compileSQLite({ + where: func(`eq`, [ref([`name`]), val(`test`)]), + }) + + expect(result.where).toBe(`"name" = ?`) + expect(result.params).toEqual([`test`]) + }) + + it(`should compile gt operator`, () => { + const result = compileSQLite({ + where: func(`gt`, [ref([`price`]), val(100)]), + }) + + expect(result.where).toBe(`"price" > ?`) + expect(result.params).toEqual([100]) + }) + + it(`should compile gte operator`, () => { + const result = compileSQLite({ + where: func(`gte`, [ref([`price`]), val(100)]), + }) + + expect(result.where).toBe(`"price" >= ?`) + expect(result.params).toEqual([100]) + }) + + it(`should compile lt operator`, () => { + const result = compileSQLite({ + where: func(`lt`, [ref([`price`]), val(100)]), + }) + + expect(result.where).toBe(`"price" < ?`) + expect(result.params).toEqual([100]) + }) + + it(`should compile lte operator`, () => { + const result = compileSQLite({ + where: func(`lte`, [ref([`price`]), val(100)]), + }) + + expect(result.where).toBe(`"price" <= ?`) + expect(result.params).toEqual([100]) + }) + + it(`should compile and operator with two conditions`, () => { + const result = compileSQLite({ + where: func(`and`, [ + func(`gt`, [ref([`price`]), val(50)]), + func(`lt`, [ref([`price`]), val(100)]), + ]), + }) + + expect(result.where).toBe(`("price" > ?) AND ("price" < ?)`) + expect(result.params).toEqual([50, 100]) + }) + + it(`should compile and operator with multiple conditions`, () => { + const result = compileSQLite({ + where: func(`and`, [ + func(`eq`, [ref([`status`]), val(`active`)]), + func(`gt`, [ref([`price`]), val(50)]), + func(`lt`, [ref([`price`]), val(100)]), + ]), + }) + + expect(result.where).toBe( + `("status" = ?) AND ("price" > ?) AND ("price" < ?)`, + ) + expect(result.params).toEqual([`active`, 50, 100]) + }) + + it(`should compile or operator`, () => { + const result = compileSQLite({ + where: func(`or`, [ + func(`eq`, [ref([`status`]), val(`active`)]), + func(`eq`, [ref([`status`]), val(`pending`)]), + ]), + }) + + expect(result.where).toBe(`("status" = ?) OR ("status" = ?)`) + expect(result.params).toEqual([`active`, `pending`]) + }) + + it(`should compile isNull operator`, () => { + const result = compileSQLite({ + where: func(`isNull`, [ref([`deleted_at`])]), + }) + + expect(result.where).toBe(`"deleted_at" IS NULL`) + expect(result.params).toEqual([]) + }) + + it(`should compile not(isNull) as IS NOT NULL`, () => { + const result = compileSQLite({ + where: func(`not`, [func(`isNull`, [ref([`deleted_at`])])]), + }) + + expect(result.where).toBe(`"deleted_at" IS NOT NULL`) + expect(result.params).toEqual([]) + }) + + it(`should compile like operator`, () => { + const result = compileSQLite({ + where: func(`like`, [ref([`name`]), val(`%test%`)]), + }) + + expect(result.where).toBe(`"name" LIKE ?`) + expect(result.params).toEqual([`%test%`]) + }) + + it(`should escape quotes in column names`, () => { + const result = compileSQLite({ + where: func(`eq`, [ref([`col"name`]), val(`test`)]), + }) + + expect(result.where).toBe(`"col""name" = ?`) + }) + + it(`should throw error for null values in comparison operators`, () => { + expect(() => + compileSQLite({ + where: func(`eq`, [ref([`name`]), val(null)]), + }), + ).toThrow(`Cannot use null/undefined with 'eq' operator`) + }) + + it(`should compile ilike operator`, () => { + const result = compileSQLite({ + where: func(`ilike`, [ref([`name`]), val(`%test%`)]), + }) + + expect(result.where).toBe(`"name" LIKE ? COLLATE NOCASE`) + expect(result.params).toEqual([`%test%`]) + }) + + it(`should compile upper function`, () => { + const result = compileSQLite({ + where: func(`eq`, [func(`upper`, [ref([`name`])]), val(`TEST`)]), + }) + + expect(result.where).toBe(`UPPER("name") = ?`) + expect(result.params).toEqual([`TEST`]) + }) + + it(`should compile lower function`, () => { + const result = compileSQLite({ + where: func(`eq`, [func(`lower`, [ref([`name`])]), val(`test`)]), + }) + + expect(result.where).toBe(`LOWER("name") = ?`) + expect(result.params).toEqual([`test`]) + }) + + it(`should compile coalesce function`, () => { + const result = compileSQLite({ + where: func(`eq`, [ + func(`coalesce`, [ref([`name`]), val(`default`)]), + val(`test`), + ]), + }) + + expect(result.where).toBe(`COALESCE("name", ?) = ?`) + expect(result.params).toEqual([`default`, `test`]) + }) + + it(`should compile length function`, () => { + const result = compileSQLite({ + where: func(`gt`, [func(`length`, [ref([`name`])]), val(5)]), + }) + + expect(result.where).toBe(`LENGTH("name") > ?`) + expect(result.params).toEqual([5]) + }) + + it(`should compile concat function with multiple args`, () => { + const result = compileSQLite({ + where: func(`eq`, [ + func(`concat`, [ref([`first_name`]), val(` `), ref([`last_name`])]), + val(`John Doe`), + ]), + }) + + expect(result.where).toBe(`CONCAT("first_name", ?, "last_name") = ?`) + expect(result.params).toEqual([` `, `John Doe`]) + }) + + it(`should compile add operator`, () => { + const result = compileSQLite({ + where: func(`gt`, [func(`add`, [ref([`price`]), val(10)]), val(100)]), + }) + + expect(result.where).toBe(`"price" + ? > ?`) + expect(result.params).toEqual([10, 100]) + }) + + it(`should throw for length with wrong arg count`, () => { + expect(() => + compileSQLite({ where: func(`length`, [ref([`a`]), ref([`b`])]) }), + ).toThrow(`length expects 1 argument`) + }) + + it(`should throw for add with wrong arg count`, () => { + expect(() => + compileSQLite({ where: func(`add`, [ref([`price`])]) }), + ).toThrow(`add expects 2 arguments`) + }) + + it(`should throw error for unsupported operators`, () => { + expect(() => + compileSQLite({ + where: func(`unsupported_op`, [ref([`name`]), val(`%test%`)]), + }), + ).toThrow(`Operator 'unsupported_op' is not supported`) + }) + }) + + describe(`orderBy compilation`, () => { + it(`should compile simple orderBy`, () => { + const result = compileSQLite({ + orderBy: [ + { + expression: ref([`price`]), + compareOptions: { direction: `asc`, nulls: `last` }, + }, + ], + }) + + expect(result.orderBy).toBe(`"price" NULLS LAST`) + expect(result.params).toEqual([]) + }) + + it(`should compile orderBy with desc direction`, () => { + const result = compileSQLite({ + orderBy: [ + { + expression: ref([`price`]), + compareOptions: { direction: `desc`, nulls: `last` }, + }, + ], + }) + + expect(result.orderBy).toBe(`"price" DESC NULLS LAST`) + }) + + it(`should compile orderBy with nulls first`, () => { + const result = compileSQLite({ + orderBy: [ + { + expression: ref([`price`]), + compareOptions: { direction: `asc`, nulls: `first` }, + }, + ], + }) + + expect(result.orderBy).toBe(`"price" NULLS FIRST`) + }) + + it(`should compile multiple orderBy clauses`, () => { + const result = compileSQLite({ + orderBy: [ + { + expression: ref([`category`]), + compareOptions: { direction: `asc`, nulls: `last` }, + }, + { + expression: ref([`price`]), + compareOptions: { direction: `desc`, nulls: `last` }, + }, + ], + }) + + expect(result.orderBy).toBe( + `"category" NULLS LAST, "price" DESC NULLS LAST`, + ) + }) + }) + + describe(`limit`, () => { + it(`should pass through limit`, () => { + const result = compileSQLite({ + limit: 50, + }) + + expect(result.limit).toBe(50) + }) + }) + + describe(`combined options`, () => { + it(`should compile where, orderBy, and limit together`, () => { + const result = compileSQLite({ + where: func(`gt`, [ref([`price`]), val(100)]), + orderBy: [ + { + expression: ref([`price`]), + compareOptions: { direction: `desc`, nulls: `last` }, + }, + ], + limit: 10, + }) + + expect(result.where).toBe(`"price" > ?`) + expect(result.orderBy).toBe(`"price" DESC NULLS LAST`) + expect(result.limit).toBe(10) + expect(result.params).toEqual([100]) + }) + + it(`should handle empty options`, () => { + const result = compileSQLite({}) + + expect(result.where).toBeUndefined() + expect(result.orderBy).toBeUndefined() + expect(result.limit).toBeUndefined() + expect(result.params).toEqual([]) + }) + }) +}) From f9519f2d671098a9b7225532994fa9e23291fc3c Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 26 Feb 2026 09:58:06 +0200 Subject: [PATCH 02/12] Formatting/cleanup. --- .../powersync-db-collection/src/powersync.ts | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/packages/powersync-db-collection/src/powersync.ts b/packages/powersync-db-collection/src/powersync.ts index a35b0d777..d83c3b258 100644 --- a/packages/powersync-db-collection/src/powersync.ts +++ b/packages/powersync-db-collection/src/powersync.ts @@ -304,7 +304,7 @@ export function powerSyncCollectionOptions< const abortController = new AbortController() let disposeTracking: (() => Promise) | null = null - + if (syncMode === `eager`) { return runEagerSync() } else { @@ -483,31 +483,33 @@ export function powerSyncCollectionOptions< error, ), ) - + // Tracks all active WHERE expressions for on-demand sync filtering. // Each loadSubset call pushes its predicate; unloadSubset removes it. const activeWhereExpressions: Array = [] const mutex = new Mutex() - const loadSubset = async (options?: LoadSubsetOptions): Promise => { + const loadSubset = async ( + options?: LoadSubsetOptions, + ): Promise => { if (options) { activeWhereExpressions.push(options.where) } - + if (activeWhereExpressions.length === 0) { await disposeTracking?.() return } - + const combinedWhere = activeWhereExpressions.length === 1 ? activeWhereExpressions[0] : or( - activeWhereExpressions[0]!, - activeWhereExpressions[1]!, + activeWhereExpressions[0], + activeWhereExpressions[1], ...activeWhereExpressions.slice(2), ) - + const compiledNewData = compileSQLite( { where: combinedWhere }, { jsonColumn: 'NEW.data' }, @@ -523,9 +525,9 @@ export function powerSyncCollectionOptions< const newDataWhenClause = toInlinedWhereClause(compiledNewData) const oldDataWhenClause = toInlinedWhereClause(compiledOldData) const viewWhereClause = toInlinedWhereClause(compiledView) - + await disposeTracking?.() - + disposeTracking = await createDiffTrigger({ when: { [DiffTriggerOperation.INSERT]: newDataWhenClause, @@ -558,7 +560,7 @@ export function powerSyncCollectionOptions< ...compiled.params, ) } - + const unloadSubset = async (options: LoadSubsetOptions) => { const idx = activeWhereExpressions.indexOf(options.where) if (idx !== -1) { @@ -579,11 +581,13 @@ export function powerSyncCollectionOptions< activeWhereExpressions.length === 1 ? activeWhereExpressions[0]! : or( - activeWhereExpressions[0]!, - activeWhereExpressions[1]!, + activeWhereExpressions[0], + activeWhereExpressions[1], ...activeWhereExpressions.slice(2), ) - const compiledRemaining = compileSQLite({ where: combinedRemaining }) + const compiledRemaining = compileSQLite({ + where: combinedRemaining, + }) const remainingWhereSQL = toInlinedWhereClause(compiledRemaining) evictionSQL = `SELECT id FROM ${viewName} WHERE (${departingWhereSQL}) AND NOT (${remainingWhereSQL})` } @@ -600,9 +604,9 @@ export function powerSyncCollectionOptions< // Recreate the diff trigger for the remaining active WHERE expressions. await loadSubset() } - + markReady() - + return { cleanup: () => { database.logger.info( From 74eef39e3f8aea3fa462d7bf05b96deaab04edf1 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 26 Feb 2026 10:12:37 +0200 Subject: [PATCH 03/12] Added test to confirm unload deletes don't affect sqlite database. --- .../tests/on-demand-sync.test.ts | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/packages/powersync-db-collection/tests/on-demand-sync.test.ts b/packages/powersync-db-collection/tests/on-demand-sync.test.ts index a23210471..96ca25c01 100644 --- a/packages/powersync-db-collection/tests/on-demand-sync.test.ts +++ b/packages/powersync-db-collection/tests/on-demand-sync.test.ts @@ -1165,6 +1165,60 @@ describe(`On-Demand Sync Mode`, () => { { timeout: 2000 }, ) }) + + it(`should evict rows from collection but preserve them in the SQLite database`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Clean up the live query — triggers unload/eviction + electronicsQuery.cleanup() + + // Wait for eviction to complete + await vi.waitFor( + () => { + expect(collection.size).toBe(0) + }, + { timeout: 2000 }, + ) + + // Verify the rows still exist in the underlying SQLite database + const sqliteRows = await db.getAll( + `SELECT * FROM products WHERE category = 'electronics'`, + ) + expect(sqliteRows).toHaveLength(3) + }) }) describe(`Edge cases`, () => { From 454f88a3f49c6aaf73d293ac7202fee7e7251c1b Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 4 Mar 2026 14:37:31 +0200 Subject: [PATCH 04/12] Handling edge cases where subsets are updating while a mutation is pending. --- .../powersync-db-collection/src/powersync.ts | 122 +++++----- .../tests/on-demand-sync.test.ts | 215 ++++++++++++++++++ 2 files changed, 286 insertions(+), 51 deletions(-) diff --git a/packages/powersync-db-collection/src/powersync.ts b/packages/powersync-db-collection/src/powersync.ts index d83c3b258..ccf69623e 100644 --- a/packages/powersync-db-collection/src/powersync.ts +++ b/packages/powersync-db-collection/src/powersync.ts @@ -358,6 +358,55 @@ export function powerSyncCollectionOptions< }) } + async function flushDiffRecords(): Promise { + await database + .writeTransaction(async (context) => { + begin() + const operations = await context.getAll( + `SELECT * FROM ${trackedTableName} ORDER BY timestamp ASC`, + ) + const pendingOperations: Array = [] + + for (const op of operations) { + const { id, operation, timestamp, value } = op + const parsedValue = deserializeSyncRow({ + id, + ...JSON.parse(value), + }) + const parsedPreviousValue = + op.operation == DiffTriggerOperation.UPDATE + ? deserializeSyncRow({ + id, + ...JSON.parse(op.previous_value), + }) + : undefined + write({ + type: mapOperation(operation), + value: parsedValue, + previousValue: parsedPreviousValue, + }) + pendingOperations.push({ + id, + operation, + timestamp, + tableName: viewName, + }) + } + + // clear the current operations + await context.execute(`DELETE FROM ${trackedTableName}`) + + commit() + pendingOperationStore.resolvePendingFor(pendingOperations) + }) + .catch((error) => { + database.logger.error( + `An error has been detected in the sync handler`, + error, + ) + }) + } + // The sync function needs to be synchronous. async function start(afterOnChangeRegistered?: () => Promise) { database.logger.info( @@ -366,52 +415,7 @@ export function powerSyncCollectionOptions< database.onChangeWithCallback( { onChange: async () => { - await database - .writeTransaction(async (context) => { - begin() - const operations = await context.getAll( - `SELECT * FROM ${trackedTableName} ORDER BY timestamp ASC`, - ) - const pendingOperations: Array = [] - - for (const op of operations) { - const { id, operation, timestamp, value } = op - const parsedValue = deserializeSyncRow({ - id, - ...JSON.parse(value), - }) - const parsedPreviousValue = - op.operation == DiffTriggerOperation.UPDATE - ? deserializeSyncRow({ - id, - ...JSON.parse(op.previous_value), - }) - : undefined - write({ - type: mapOperation(operation), - value: parsedValue, - previousValue: parsedPreviousValue, - }) - pendingOperations.push({ - id, - operation, - timestamp, - tableName: viewName, - }) - } - - // clear the current operations - await context.execute(`DELETE FROM ${trackedTableName}`) - - commit() - pendingOperationStore.resolvePendingFor(pendingOperations) - }) - .catch((error) => { - database.logger.error( - `An error has been detected in the sync handler`, - error, - ) - }) + await flushDiffRecords() }, }, { @@ -487,7 +491,11 @@ export function powerSyncCollectionOptions< // Tracks all active WHERE expressions for on-demand sync filtering. // Each loadSubset call pushes its predicate; unloadSubset removes it. const activeWhereExpressions: Array = [] - const mutex = new Mutex() + // Mutex for loadSubset() and unloadSubset() calls invoked by subset changes. + const subsetMutex = new Mutex() + + // Mutex for flushDiffRecords() and disposeTracking() calls + const operationsMutex = new Mutex() const loadSubset = async ( options?: LoadSubsetOptions, @@ -497,7 +505,13 @@ export function powerSyncCollectionOptions< } if (activeWhereExpressions.length === 0) { - await disposeTracking?.() + await operationsMutex.runExclusive(async () => { + await flushDiffRecords() + }) + + await operationsMutex.runExclusive(async () => { + await disposeTracking?.() + }) return } @@ -526,7 +540,13 @@ export function powerSyncCollectionOptions< const oldDataWhenClause = toInlinedWhereClause(compiledOldData) const viewWhereClause = toInlinedWhereClause(compiledView) - await disposeTracking?.() + await operationsMutex.runExclusive(async () => { + await flushDiffRecords() + }) + + await operationsMutex.runExclusive(async () => { + await disposeTracking?.() + }) disposeTracking = await createDiffTrigger({ when: { @@ -615,9 +635,9 @@ export function powerSyncCollectionOptions< abortController.abort() }, loadSubset: (options: LoadSubsetOptions) => - mutex.runExclusive(() => loadSubset(options)), + subsetMutex.runExclusive(() => loadSubset(options)), unloadSubset: (options: LoadSubsetOptions) => - mutex.runExclusive(() => unloadSubset(options)), + subsetMutex.runExclusive(() => unloadSubset(options)), } } }, diff --git a/packages/powersync-db-collection/tests/on-demand-sync.test.ts b/packages/powersync-db-collection/tests/on-demand-sync.test.ts index 96ca25c01..c084562ec 100644 --- a/packages/powersync-db-collection/tests/on-demand-sync.test.ts +++ b/packages/powersync-db-collection/tests/on-demand-sync.test.ts @@ -1550,4 +1550,219 @@ describe(`On-Demand Sync Mode`, () => { ) }) }) + + describe(`Pending mutations during filter changes`, () => { + it(`should resolve isPersisted when loadSubset is called during a pending mutation`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // LQ1: electronics category + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Insert a new electronics product — creates a pending mutation + const insertResult = collection.insert({ + id: randomUUID(), + name: `New Gadget`, + price: 99, + category: `electronics`, + }) + + // Immediately create a second live query for clothing — triggers loadSubset + // which rebuilds the diff trigger, potentially dropping unprocessed diff records + const clothingQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `clothing`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => clothingQuery.cleanup()) + + await clothingQuery.preload() + + // isPersisted.promise should resolve — if the bug is present, this hangs forever + await vi.waitFor( + async () => { + await insertResult.isPersisted.promise + }, + { timeout: 5000 }, + ) + }) + + it(`should resolve isPersisted when unloadSubset is called during a pending mutation`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // LQ1: electronics category + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // LQ2: clothing category + const clothingQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `clothing`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await clothingQuery.preload() + + await vi.waitFor( + () => { + expect(clothingQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Insert a new electronics product — creates a pending mutation + const insertResult = collection.insert({ + id: randomUUID(), + name: `New Gadget`, + price: 99, + category: `electronics`, + }) + + // Immediately clean up the clothing query — triggers unloadSubset → loadSubset + // which rebuilds the diff trigger, potentially dropping unprocessed diff records + clothingQuery.cleanup() + + // isPersisted.promise should resolve — if the bug is present, this hangs forever + await vi.waitFor( + async () => { + await insertResult.isPersisted.promise + }, + { timeout: 5000 }, + ) + }) + + it(`should resolve isPersisted when all live queries are cleaned up during a pending mutation`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Start with 1 live query (electronics) + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Insert a new electronics product — creates a pending mutation + const insertResult = collection.insert({ + id: randomUUID(), + name: `New Gadget`, + price: 99, + category: `electronics`, + }) + + // Immediately clean up the only live query — triggers unloadSubset → loadSubset + // with 0 predicates (early-return path), which must still call resolveAllPendingFor + electronicsQuery.cleanup() + + // isPersisted.promise should resolve — if the bug is present, this hangs forever + await vi.waitFor( + async () => { + await insertResult.isPersisted.promise + }, + { timeout: 5000 }, + ) + }) + }) }) From 0ba0cea2b07fae43eaf29c905e5ed0b5a3c4c457 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 5 Mar 2026 15:28:02 +0200 Subject: [PATCH 05/12] Using new manageDestinationExternally option on createDiffTrigger to escape internal management of the destination table. Allows us to drop the need for mutexes. --- packages/powersync-db-collection/package.json | 5 +- .../powersync-db-collection/src/powersync.ts | 55 ++++++++++--------- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/packages/powersync-db-collection/package.json b/packages/powersync-db-collection/package.json index 0f763b3cf..5ed056203 100644 --- a/packages/powersync-db-collection/package.json +++ b/packages/powersync-db-collection/package.json @@ -55,7 +55,6 @@ "@standard-schema/spec": "^1.1.0", "@tanstack/db": "workspace:*", "@tanstack/store": "^0.8.0", - "async-mutex": "^0.5.0", "debug": "^4.4.3", "p-defer": "^4.0.1" }, @@ -63,8 +62,8 @@ "@powersync/common": "^1.41.0" }, "devDependencies": { - "@powersync/common": "^1.44.0", - "@powersync/node": "^0.15.1", + "@powersync/common": "0.0.0-dev-20260305124002", + "@powersync/node": "0.0.0-dev-20260305124002", "@types/debug": "^4.1.12", "@vitest/coverage-istanbul": "^3.2.4" } diff --git a/packages/powersync-db-collection/src/powersync.ts b/packages/powersync-db-collection/src/powersync.ts index ccf69623e..0d2f5cd8e 100644 --- a/packages/powersync-db-collection/src/powersync.ts +++ b/packages/powersync-db-collection/src/powersync.ts @@ -1,5 +1,4 @@ import { DiffTriggerOperation, sanitizeSQL } from '@powersync/common' -import { Mutex } from 'async-mutex' import { or } from '@tanstack/db' import { compileSQLite } from './sqlite-compiler' import { PendingOperationStore } from './PendingOperationStore' @@ -312,6 +311,7 @@ export function powerSyncCollectionOptions< } async function createDiffTrigger(options: { + manageDestinationExternally: boolean when: Record writeType: (rowId: string) => OperationType batchQuery: ( @@ -321,11 +321,18 @@ export function powerSyncCollectionOptions< ) => Promise> onReady: () => void }) { - const { when, writeType, batchQuery, onReady } = options + const { + manageDestinationExternally, + when, + writeType, + batchQuery, + onReady, + } = options return await database.triggers.createDiffTrigger({ source: viewName, destination: trackedTableName, + manageDestinationExternally, when, hooks: { beforeCreate: async (context) => { @@ -446,6 +453,7 @@ export function powerSyncCollectionOptions< function runEagerSync() { start(async () => { disposeTracking = await createDiffTrigger({ + manageDestinationExternally: false, when: { [DiffTriggerOperation.INSERT]: `TRUE`, [DiffTriggerOperation.UPDATE]: `TRUE`, @@ -491,11 +499,6 @@ export function powerSyncCollectionOptions< // Tracks all active WHERE expressions for on-demand sync filtering. // Each loadSubset call pushes its predicate; unloadSubset removes it. const activeWhereExpressions: Array = [] - // Mutex for loadSubset() and unloadSubset() calls invoked by subset changes. - const subsetMutex = new Mutex() - - // Mutex for flushDiffRecords() and disposeTracking() calls - const operationsMutex = new Mutex() const loadSubset = async ( options?: LoadSubsetOptions, @@ -505,16 +508,24 @@ export function powerSyncCollectionOptions< } if (activeWhereExpressions.length === 0) { - await operationsMutex.runExclusive(async () => { - await flushDiffRecords() - }) - - await operationsMutex.runExclusive(async () => { - await disposeTracking?.() - }) + await flushDiffRecords() + await disposeTracking?.() return } + await database.writeLock(async (context) => { + await context.execute(` + CREATE TEMP TABLE IF NOT EXISTS ${trackedTableName} ( + operation_id INTEGER PRIMARY KEY AUTOINCREMENT, + id TEXT, + operation TEXT, + timestamp TEXT, + value TEXT, + previous_value TEXT + ) + `) + }) + const combinedWhere = activeWhereExpressions.length === 1 ? activeWhereExpressions[0] @@ -540,15 +551,11 @@ export function powerSyncCollectionOptions< const oldDataWhenClause = toInlinedWhereClause(compiledOldData) const viewWhereClause = toInlinedWhereClause(compiledView) - await operationsMutex.runExclusive(async () => { - await flushDiffRecords() - }) - - await operationsMutex.runExclusive(async () => { - await disposeTracking?.() - }) + await flushDiffRecords() + await disposeTracking?.() disposeTracking = await createDiffTrigger({ + manageDestinationExternally: true, when: { [DiffTriggerOperation.INSERT]: newDataWhenClause, [DiffTriggerOperation.UPDATE]: `(${newDataWhenClause}) OR (${oldDataWhenClause})`, @@ -634,10 +641,8 @@ export function powerSyncCollectionOptions< ) abortController.abort() }, - loadSubset: (options: LoadSubsetOptions) => - subsetMutex.runExclusive(() => loadSubset(options)), - unloadSubset: (options: LoadSubsetOptions) => - subsetMutex.runExclusive(() => unloadSubset(options)), + loadSubset: (options: LoadSubsetOptions) => loadSubset(options), + unloadSubset: (options: LoadSubsetOptions) => unloadSubset(options), } } }, From 16684fc5a924b7c82d20e041ff0cd7ddbadc350e Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Fri, 6 Mar 2026 15:07:50 +0200 Subject: [PATCH 06/12] Cleanup logic for dropping the destination table, ordering by operation_id. Using createDestination table helper. --- packages/powersync-db-collection/package.json | 4 +-- .../powersync-db-collection/src/powersync.ts | 34 +++++++++++-------- .../tests/on-demand-sync.test.ts | 3 +- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/packages/powersync-db-collection/package.json b/packages/powersync-db-collection/package.json index 5ed056203..083e3e29e 100644 --- a/packages/powersync-db-collection/package.json +++ b/packages/powersync-db-collection/package.json @@ -62,8 +62,8 @@ "@powersync/common": "^1.41.0" }, "devDependencies": { - "@powersync/common": "0.0.0-dev-20260305124002", - "@powersync/node": "0.0.0-dev-20260305124002", + "@powersync/common": "0.0.0-dev-20260306125455", + "@powersync/node": "0.0.0-dev-20260306125455", "@types/debug": "^4.1.12", "@vitest/coverage-istanbul": "^3.2.4" } diff --git a/packages/powersync-db-collection/src/powersync.ts b/packages/powersync-db-collection/src/powersync.ts index 0d2f5cd8e..bb24dd5ca 100644 --- a/packages/powersync-db-collection/src/powersync.ts +++ b/packages/powersync-db-collection/src/powersync.ts @@ -370,7 +370,7 @@ export function powerSyncCollectionOptions< .writeTransaction(async (context) => { begin() const operations = await context.getAll( - `SELECT * FROM ${trackedTableName} ORDER BY timestamp ASC`, + `SELECT * FROM ${trackedTableName} ORDER BY operation_id ASC`, ) const pendingOperations: Array = [] @@ -440,8 +440,22 @@ export function powerSyncCollectionOptions< } else { abortController.signal.addEventListener( `abort`, - () => { - disposeTracking?.() + async () => { + await disposeTracking?.() + + // In on-demand mode, we need to manually drop the destination table because we opt-out of internal management of the destination table. + if (syncMode === 'on-demand') { + try { + await database.execute( + `DROP TABLE IF EXISTS ${trackedTableName};`, + ) + } catch (error) { + database.logger.error( + `Could not drop tracked table ${trackedTableName}`, + error, + ) + } + } }, { once: true }, ) @@ -513,17 +527,9 @@ export function powerSyncCollectionOptions< return } - await database.writeLock(async (context) => { - await context.execute(` - CREATE TEMP TABLE IF NOT EXISTS ${trackedTableName} ( - operation_id INTEGER PRIMARY KEY AUTOINCREMENT, - id TEXT, - operation TEXT, - timestamp TEXT, - value TEXT, - previous_value TEXT - ) - `) + await database.triggers.createDiffDestinationTable(trackedTableName, { + temporary: true, + onlyIfNotExists: true, }) const combinedWhere = diff --git a/packages/powersync-db-collection/tests/on-demand-sync.test.ts b/packages/powersync-db-collection/tests/on-demand-sync.test.ts index c084562ec..1d9ea98b6 100644 --- a/packages/powersync-db-collection/tests/on-demand-sync.test.ts +++ b/packages/powersync-db-collection/tests/on-demand-sync.test.ts @@ -33,10 +33,11 @@ describe(`On-Demand Sync Mode`, () => { schema: APP_SCHEMA, }) onTestFinished(async () => { + await db.disconnectAndClear() + // Wait a moment for any pending cleanup operations to complete // before closing the database to prevent "operation on closed remote" errors await new Promise((resolve) => setTimeout(resolve, 100)) - await db.disconnectAndClear() await db.close() }) await db.disconnectAndClear() From 654faeb026b073e469eb069434a6835b2cd73a77 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 9 Mar 2026 13:55:19 +0200 Subject: [PATCH 07/12] Added onLoad/onUnload/onLoadSubset/onUnloadSubset hooks. --- packages/powersync-db-collection/package.json | 4 +- .../src/definitions.ts | 47 ++++- .../powersync-db-collection/src/powersync.ts | 6 + .../tests/sync-streams.test.ts | 169 ++++++++++++++++++ 4 files changed, 222 insertions(+), 4 deletions(-) create mode 100644 packages/powersync-db-collection/tests/sync-streams.test.ts diff --git a/packages/powersync-db-collection/package.json b/packages/powersync-db-collection/package.json index 083e3e29e..c917ea5d1 100644 --- a/packages/powersync-db-collection/package.json +++ b/packages/powersync-db-collection/package.json @@ -62,8 +62,8 @@ "@powersync/common": "^1.41.0" }, "devDependencies": { - "@powersync/common": "0.0.0-dev-20260306125455", - "@powersync/node": "0.0.0-dev-20260306125455", + "@powersync/common": "0.0.0-dev-20260309101613", + "@powersync/node": "0.0.0-dev-20260309101613", "@types/debug": "^4.1.12", "@vitest/coverage-istanbul": "^3.2.4" } diff --git a/packages/powersync-db-collection/src/definitions.ts b/packages/powersync-db-collection/src/definitions.ts index c65fa850e..81b08f1e0 100644 --- a/packages/powersync-db-collection/src/definitions.ts +++ b/packages/powersync-db-collection/src/definitions.ts @@ -4,6 +4,7 @@ import type { BaseCollectionConfig, CollectionConfig, InferSchemaOutput, + LoadSubsetOptions, } from '@tanstack/db' import type { AnyTableColumnType, @@ -162,12 +163,54 @@ export type ConfigWithArbitraryCollectionTypes< StandardSchemaV1.InferOutput > } +/** + * Eager sync mode hooks. + * Called once when the collection sync starts and stops. + */ +export type EagerSyncHooks = { + syncMode?: 'eager' + /** + * Called when the collection sync starts. + * Use this to set up external data sources (e.g. subscribing to a sync stream). + */ + onLoad?: () => void | Promise + /** + * Called when the collection sync is cleaned up. + * Use this to tear down external data sources (e.g. unsubscribing from a sync stream). + */ + onUnload?: () => void | Promise + onLoadSubset?: never + onUnloadSubset?: never +} + +/** + * On-demand sync mode hooks. + * Called each time a subset is loaded or unloaded in response to live query changes. + */ +export type OnDemandSyncHooks = { + syncMode: 'on-demand' + onLoad?: never + onUnload?: never + /** + * Called when a subset of data is requested by a live query. + * Use this to set up external data sources for the requested subset + * (e.g. subscribing to a sync stream with parameters derived from the query predicate). + */ + onLoadSubset?: (options: LoadSubsetOptions) => void | Promise + /** + * Called when a subset of data is unloaded from the collection. + * Use this to tear down external data sources for the given subset + * (e.g. unsubscribing from a sync stream). + */ + onUnloadSubset?: (options: LoadSubsetOptions) => void | Promise +} + export type BasePowerSyncCollectionConfig< TTable extends Table = Table, TSchema extends StandardSchemaV1 = never, > = Omit< BaseCollectionConfig, string, TSchema>, - `onInsert` | `onUpdate` | `onDelete` | `getKey` + `onInsert` | `onUpdate` | `onDelete` | `getKey` | `syncMode` > & { /** The PowerSync schema Table definition */ table: TTable @@ -186,7 +229,7 @@ export type BasePowerSyncCollectionConfig< * streaming of initial results, at the cost of more query calls. */ syncBatchSize?: number -} +} & (EagerSyncHooks | OnDemandSyncHooks) /** * Configuration interface for PowerSync collection options. diff --git a/packages/powersync-db-collection/src/powersync.ts b/packages/powersync-db-collection/src/powersync.ts index bb24dd5ca..6b242d8b2 100644 --- a/packages/powersync-db-collection/src/powersync.ts +++ b/packages/powersync-db-collection/src/powersync.ts @@ -466,6 +466,8 @@ export function powerSyncCollectionOptions< // Registers a diff trigger for the entire table. function runEagerSync() { start(async () => { + await restConfig.onLoad?.() + disposeTracking = await createDiffTrigger({ manageDestinationExternally: false, when: { @@ -497,6 +499,7 @@ export function powerSyncCollectionOptions< `Sync has been stopped for ${viewName} into ${trackedTableName}`, ) abortController.abort() + restConfig.onUnload?.() } } @@ -519,6 +522,7 @@ export function powerSyncCollectionOptions< ): Promise => { if (options) { activeWhereExpressions.push(options.where) + await restConfig.onLoadSubset?.(options) } if (activeWhereExpressions.length === 0) { @@ -595,6 +599,8 @@ export function powerSyncCollectionOptions< } const unloadSubset = async (options: LoadSubsetOptions) => { + await restConfig.onUnloadSubset?.(options) + const idx = activeWhereExpressions.indexOf(options.where) if (idx !== -1) { activeWhereExpressions.splice(idx, 1) diff --git a/packages/powersync-db-collection/tests/sync-streams.test.ts b/packages/powersync-db-collection/tests/sync-streams.test.ts new file mode 100644 index 000000000..a909c7834 --- /dev/null +++ b/packages/powersync-db-collection/tests/sync-streams.test.ts @@ -0,0 +1,169 @@ +import { randomUUID } from 'node:crypto' +import { tmpdir } from 'node:os' +import { PowerSyncDatabase, Schema, Table, column } from '@powersync/node' +import { + createCollection, + createLiveQueryCollection, + eq, +} from '@tanstack/db' +import { describe, expect, it, onTestFinished, vi } from 'vitest' +import { powerSyncCollectionOptions } from '../src' + +const APP_SCHEMA = new Schema({ + products: new Table({ + name: column.text, + price: column.integer, + category: column.text, + }), +}) + +describe(`Sync Streams`, () => { + async function createDatabase() { + const db = new PowerSyncDatabase({ + database: { + dbFilename: `test-sync-streams-${randomUUID()}.sqlite`, + dbLocation: tmpdir(), + implementation: { type: `node:sqlite` }, + }, + schema: APP_SCHEMA, + }) + onTestFinished(async () => { + await db.disconnectAndClear() + await new Promise((resolve) => setTimeout(resolve, 100)) + await db.close() + }) + await db.disconnectAndClear() + return db + } + + async function createTestProducts(db: PowerSyncDatabase) { + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES + (uuid(), 'Product A', 50, 'electronics'), + (uuid(), 'Product B', 150, 'electronics'), + (uuid(), 'Product C', 25, 'clothing'), + (uuid(), 'Product D', 200, 'electronics'), + (uuid(), 'Product E', 75, 'clothing') + `) + } + + it(`eager mode: should call onLoad on sync start and onUnload on cleanup`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const onLoad = vi.fn() + const onUnload = vi.fn() + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + onLoad, + onUnload, + }), + ) + + await collection.stateWhenReady() + + expect(onLoad).toHaveBeenCalledOnce() + expect(onUnload).not.toHaveBeenCalled() + + collection.cleanup() + + expect(onUnload).toHaveBeenCalledOnce() + }) + + it(`on-demand mode: should call onLoadSubset/onUnloadSubset for each live query`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const onLoadSubset = vi.fn() + const onUnloadSubset = vi.fn() + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + onLoadSubset, + onUnloadSubset, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // LQ1: electronics + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + expect(onLoadSubset).toHaveBeenCalledTimes(1) + expect(onUnloadSubset).not.toHaveBeenCalled() + + // LQ2: clothing + const clothingQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `clothing`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await clothingQuery.preload() + + await vi.waitFor( + () => { + expect(clothingQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + expect(onLoadSubset).toHaveBeenCalledTimes(2) + expect(onUnloadSubset).not.toHaveBeenCalled() + + // Cleanup LQ1 — should trigger first unload + electronicsQuery.cleanup() + + await vi.waitFor( + () => { + expect(onUnloadSubset).toHaveBeenCalledTimes(1) + }, + { timeout: 2000 }, + ) + + // Cleanup LQ2 — should trigger second unload + clothingQuery.cleanup() + + await vi.waitFor( + () => { + expect(onUnloadSubset).toHaveBeenCalledTimes(2) + }, + { timeout: 2000 }, + ) + }) +}) From 0021109e732b1cbacde4774fe18b6b026f233329 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 9 Mar 2026 16:05:38 +0200 Subject: [PATCH 08/12] Updated load hooks to include cleanup function in load call instead of having dedicated unload implementation. --- .../src/definitions.ts | 25 ++++------ .../powersync-db-collection/src/powersync.ts | 19 +++++-- ...ync-streams.test.ts => load-hooks.test.ts} | 50 +++++++++++-------- 3 files changed, 52 insertions(+), 42 deletions(-) rename packages/powersync-db-collection/tests/{sync-streams.test.ts => load-hooks.test.ts} (78%) diff --git a/packages/powersync-db-collection/src/definitions.ts b/packages/powersync-db-collection/src/definitions.ts index 81b08f1e0..9c63864cd 100644 --- a/packages/powersync-db-collection/src/definitions.ts +++ b/packages/powersync-db-collection/src/definitions.ts @@ -2,6 +2,7 @@ import type { AbstractPowerSyncDatabase, Table } from '@powersync/common' import type { StandardSchemaV1 } from '@standard-schema/spec' import type { BaseCollectionConfig, + CleanupFn, CollectionConfig, InferSchemaOutput, LoadSubsetOptions, @@ -172,15 +173,11 @@ export type EagerSyncHooks = { /** * Called when the collection sync starts. * Use this to set up external data sources (e.g. subscribing to a sync stream). + * + * @returns A cleanup function that is called when the collection sync is cleaned up. */ - onLoad?: () => void | Promise - /** - * Called when the collection sync is cleaned up. - * Use this to tear down external data sources (e.g. unsubscribing from a sync stream). - */ - onUnload?: () => void | Promise + onLoad?: () => CleanupFn | void | Promise onLoadSubset?: never - onUnloadSubset?: never } /** @@ -190,19 +187,17 @@ export type EagerSyncHooks = { export type OnDemandSyncHooks = { syncMode: 'on-demand' onLoad?: never - onUnload?: never /** * Called when a subset of data is requested by a live query. * Use this to set up external data sources for the requested subset * (e.g. subscribing to a sync stream with parameters derived from the query predicate). + * + * @returns A cleanup function that is called when the subset is unloaded. */ - onLoadSubset?: (options: LoadSubsetOptions) => void | Promise - /** - * Called when a subset of data is unloaded from the collection. - * Use this to tear down external data sources for the given subset - * (e.g. unsubscribing from a sync stream). - */ - onUnloadSubset?: (options: LoadSubsetOptions) => void | Promise + + onLoadSubset?: ( + options: LoadSubsetOptions, + ) => CleanupFn | void | Promise } export type BasePowerSyncCollectionConfig< diff --git a/packages/powersync-db-collection/src/powersync.ts b/packages/powersync-db-collection/src/powersync.ts index 6b242d8b2..470a2fd27 100644 --- a/packages/powersync-db-collection/src/powersync.ts +++ b/packages/powersync-db-collection/src/powersync.ts @@ -7,7 +7,12 @@ import { DEFAULT_BATCH_SIZE } from './definitions' import { asPowerSyncRecord, mapOperation } from './helpers' import { convertTableToSchema } from './schema' import { serializeForSQLite } from './serialization' -import type { LoadSubsetOptions, OperationType, SyncConfig } from '@tanstack/db' +import type { + CleanupFn, + LoadSubsetOptions, + OperationType, + SyncConfig, +} from '@tanstack/db' import type { AnyTableColumnType, ExtractedTable, @@ -465,8 +470,10 @@ export function powerSyncCollectionOptions< // Eager mode. // Registers a diff trigger for the entire table. function runEagerSync() { + let cleanup: CleanupFn | void | null = null + start(async () => { - await restConfig.onLoad?.() + cleanup = await restConfig.onLoad?.() disposeTracking = await createDiffTrigger({ manageDestinationExternally: false, @@ -499,13 +506,15 @@ export function powerSyncCollectionOptions< `Sync has been stopped for ${viewName} into ${trackedTableName}`, ) abortController.abort() - restConfig.onUnload?.() + cleanup?.() } } // On-demand mode. // Registers a diff trigger for the active WHERE expressions. function runOnDemandSync() { + let cleanup: CleanupFn | void | null = null + start().catch((error) => database.logger.error( `Could not start syncing process for ${viewName} into ${trackedTableName}`, @@ -522,7 +531,7 @@ export function powerSyncCollectionOptions< ): Promise => { if (options) { activeWhereExpressions.push(options.where) - await restConfig.onLoadSubset?.(options) + cleanup = await restConfig.onLoadSubset?.(options) } if (activeWhereExpressions.length === 0) { @@ -599,7 +608,7 @@ export function powerSyncCollectionOptions< } const unloadSubset = async (options: LoadSubsetOptions) => { - await restConfig.onUnloadSubset?.(options) + cleanup?.() const idx = activeWhereExpressions.indexOf(options.where) if (idx !== -1) { diff --git a/packages/powersync-db-collection/tests/sync-streams.test.ts b/packages/powersync-db-collection/tests/load-hooks.test.ts similarity index 78% rename from packages/powersync-db-collection/tests/sync-streams.test.ts rename to packages/powersync-db-collection/tests/load-hooks.test.ts index a909c7834..6a3f5cb1e 100644 --- a/packages/powersync-db-collection/tests/sync-streams.test.ts +++ b/packages/powersync-db-collection/tests/load-hooks.test.ts @@ -1,11 +1,7 @@ import { randomUUID } from 'node:crypto' import { tmpdir } from 'node:os' import { PowerSyncDatabase, Schema, Table, column } from '@powersync/node' -import { - createCollection, - createLiveQueryCollection, - eq, -} from '@tanstack/db' +import { createCollection, createLiveQueryCollection, eq } from '@tanstack/db' import { describe, expect, it, onTestFinished, vi } from 'vitest' import { powerSyncCollectionOptions } from '../src' @@ -52,42 +48,52 @@ describe(`Sync Streams`, () => { const db = await createDatabase() await createTestProducts(db) - const onLoad = vi.fn() - const onUnload = vi.fn() + const onLoadMock = vi.fn() + const onUnloadMock = vi.fn() const collection = createCollection( powerSyncCollectionOptions({ database: db, table: APP_SCHEMA.props.products, - onLoad, - onUnload, + onLoad: async () => { + await onLoadMock() + + return () => { + onUnloadMock() + } + }, }), ) await collection.stateWhenReady() - expect(onLoad).toHaveBeenCalledOnce() - expect(onUnload).not.toHaveBeenCalled() + expect(onLoadMock).toHaveBeenCalledOnce() + expect(onUnloadMock).not.toHaveBeenCalled() collection.cleanup() - expect(onUnload).toHaveBeenCalledOnce() + expect(onUnloadMock).toHaveBeenCalledOnce() }) it(`on-demand mode: should call onLoadSubset/onUnloadSubset for each live query`, async () => { const db = await createDatabase() await createTestProducts(db) - const onLoadSubset = vi.fn() - const onUnloadSubset = vi.fn() + const onLoadSubsetMock = vi.fn() + const onUnloadSubsetMock = vi.fn() const collection = createCollection( powerSyncCollectionOptions({ database: db, table: APP_SCHEMA.props.products, syncMode: `on-demand`, - onLoadSubset, - onUnloadSubset, + onLoadSubset: () => { + onLoadSubsetMock() + + return () => { + onUnloadSubsetMock() + } + }, }), ) onTestFinished(() => collection.cleanup()) @@ -117,8 +123,8 @@ describe(`Sync Streams`, () => { { timeout: 2000 }, ) - expect(onLoadSubset).toHaveBeenCalledTimes(1) - expect(onUnloadSubset).not.toHaveBeenCalled() + expect(onLoadSubsetMock).toHaveBeenCalledTimes(1) + expect(onUnloadSubsetMock).not.toHaveBeenCalled() // LQ2: clothing const clothingQuery = createLiveQueryCollection({ @@ -143,15 +149,15 @@ describe(`Sync Streams`, () => { { timeout: 2000 }, ) - expect(onLoadSubset).toHaveBeenCalledTimes(2) - expect(onUnloadSubset).not.toHaveBeenCalled() + expect(onLoadSubsetMock).toHaveBeenCalledTimes(2) + expect(onUnloadSubsetMock).not.toHaveBeenCalled() // Cleanup LQ1 — should trigger first unload electronicsQuery.cleanup() await vi.waitFor( () => { - expect(onUnloadSubset).toHaveBeenCalledTimes(1) + expect(onUnloadSubsetMock).toHaveBeenCalledTimes(1) }, { timeout: 2000 }, ) @@ -161,7 +167,7 @@ describe(`Sync Streams`, () => { await vi.waitFor( () => { - expect(onUnloadSubset).toHaveBeenCalledTimes(2) + expect(onUnloadSubsetMock).toHaveBeenCalledTimes(2) }, { timeout: 2000 }, ) From 4ed91f5f5250e6625b7da2ad3aa9404af6ee6c1f Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 10 Mar 2026 10:28:06 +0200 Subject: [PATCH 09/12] sync stream example --- .../sync-stream-examples.md | 223 ++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 packages/powersync-db-collection/sync-stream-examples.md diff --git a/packages/powersync-db-collection/sync-stream-examples.md b/packages/powersync-db-collection/sync-stream-examples.md new file mode 100644 index 000000000..fed0e6054 --- /dev/null +++ b/packages/powersync-db-collection/sync-stream-examples.md @@ -0,0 +1,223 @@ +## Calling sync streams automatically + +For the these examples we assuming the follow sync stream exists: + +``` +streams: + lists: + query: SELECT * FROM lists WHERE owner_id = auth.user_id() + auto_subscribe: true + todos: + query: SELECT * FROM todos WHERE list_id = subscription.parameter('list') AND list_id IN (SELECT id FROM lists WHERE owner_id = auth.user_id()) + +config: + edition: 2 +``` + +### Example 1: Eager mode basic usage + +```typescript +const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: AppSchema.props.todos, + syncMode: `eager`, + onLoad: async () => { + console.log('onLoad') + const subscription = await db + .syncStream('todos', { list: '368b41f1-72fd-4a81-92ad-190711d72435' }) + .subscribe({ ttl: 0 }) + + await subscription.waitForFirstSync() + + return () => { + console.log('onUnload') + subscription.unsubscribe() + } + }, + }), +) +``` + +### Example 2: On-demand basic usage + +```typescript +const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: AppSchema.props.todos, + syncMode: `on-demand`, + onLoadSubset: async (options) => { + console.log('onLoadSubset') + const subscription = await db + .syncStream('todos', { list: '368b41f1-72fd-4a81-92ad-190711d72435' }) + .subscribe({ ttl: 0 }) + + await subscription.waitForFirstSync() + + return () => { + console.log('onUnloadSubset') + subscription.unsubscribe() + } + }, + }), +) +``` + +### Example 3: Extract a single filter value using `extractSimpleComparisons` + +Given a live query like: + +``` +.where(({ todo }) => eq(todo.list_id, selectedListId)) +``` + +`onLoadSubset` receives options.where as an expression tree for eq(list_id, ''). +We parse it to get the `list_id` value and pass it to `syncStream`. + +#### Collection + +```typescript +const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: AppSchema.props.todos, + syncMode: 'on-demand', + onLoadSubset: async (options) => { + // Extract simple comparisons from the where expression + const comparisons = extractSimpleComparisons(options.where) + // comparisons = [{ field: ['todo', 'list_id'], operator: 'eq', value: '' }] + + // Find the list_id filter + const listIdFilter = comparisons.find( + (c) => c.field.includes('list_id') && c.operator === 'eq', + ) + + if (!listIdFilter) { + console.warn('No list_id filter found, skipping sync stream') + return + } + + console.log(`Subscribing to todos for list: ${listIdFilter.value}`) + + const subscription = await db + .syncStream('todos', { list: listIdFilter.value }) + .subscribe({ ttl: 0 }) + + await subscription.waitForFirstSync() + + return () => { + console.log(`Unsubscribing from todos for list: ${listIdFilter.value}`) + subscription.unsubscribe() + } + }, + }), +) +``` + +#### Live query + +Simple filter -> triggers `onLoadSubset` with `eq(list_id, '...')` + +```typescript +const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ todo: collection }) + .where( + ({ todo }) => eq(todo.list_id, '368b41f1-72fd-4a81-92ad-190711d72435'), // or some listId variable + ) + .select(({ todo }) => ({ + id: todo.id, + description: todo.description, + completed: todo.completed, + })), +}) +``` + +### Example 4: Use `parseWhereExpression` with custom handlers + +`parseWhereExpression` gives you full control over how each operator is handled. +Here we build a params object for syncStream from the expression tree. + +Assume a small adjustment to the sync stream definition of todos (adding the `completed` subscription parameter) + +``` +todos: + query: SELECT * FROM todos WHERE list_id = subscription.parameter('list') AND completed = subscription.parameter("completed") AND list_id IN (SELECT id FROM lists WHERE owner_id = auth.user_id()) +``` + +Note: We keep the `list` parameter name as is (consistent with most of our examples), but to correctly work with the following example we need to map it to `list_id`. You may opt to name it as `list_id` in the sync stream definition and skip the mapping process. + +#### Collection + +```typescript +const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: AppSchema.props.todos, + syncMode: 'on-demand', + onLoadSubset: async (options) => { + // Parse the where into a flat params record using custom handlers + const streamParams = parseWhereExpression(options.where, { + handlers: { + eq: (field: Array, value: unknown) => { + const mappedField = mapFields(field[field.length - 1]!) + + return { + [mappedField]: value, + } + }, + and: (...filters: Array>) => + Object.assign({}, ...filters), + }, + onUnknownOperator: (op, _args) => { + console.warn(`Ignoring unsupported operator in stream params: ${op}`) + return {} + }, + }) + // For a query like: where(({ todo }) => and(eq(todo.list_id, 'abc'), eq(todo.completed, 0))) + // streamParams = { list: 'abc', completed: 0 } + + if (!streamParams || Object.keys(streamParams).length === 0) { + console.warn('No stream params extracted, skipping sync stream') + return + } + + console.log( + `Subscribing to todos with params: ${JSON.stringify(streamParams)}`, + ) + + const subscription = await db + .syncStream('todos', streamParams) + .subscribe({ ttl: 0 }) + + await subscription.waitForFirstSync() + + return () => subscription.unsubscribe() + }, + }), +) +``` + +#### Live Query + +Compound filter -> triggers `onLoadSubset` with `and(eq(list_id, '...'), eq(completed, 1))` + +```typescript +const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ todo: collection }) + .where(({ todo }) => + and( + eq(todo.list_id, '368b41f1-72fd-4a81-92ad-190711d72435'), + eq(todo.completed, 1), + ), + ) + .select(({ todo }) => ({ + id: todo.id, + description: todo.description, + })), +}) +``` From 98d9f90f30ed0d56e3286cf2675e42b73ba6ec2e Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 10 Mar 2026 12:39:56 +0200 Subject: [PATCH 10/12] Updated examples. --- .../sync-stream-examples.md | 76 +++++++++++++++---- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/packages/powersync-db-collection/sync-stream-examples.md b/packages/powersync-db-collection/sync-stream-examples.md index fed0e6054..003b34d51 100644 --- a/packages/powersync-db-collection/sync-stream-examples.md +++ b/packages/powersync-db-collection/sync-stream-examples.md @@ -1,21 +1,31 @@ -## Calling sync streams automatically +## Incorporating Sync Streams +Ideally we would be able to map TanstackDB queries to sync streams automatically, if we can optimise the amount of data sync to +the sqlite database from the service we have smaller set of data that needs to be considered when syncing from the sqlite database to TanstackDB collections. + +As a stepping stone towards that, we now expose data loading hooks for both eager and on-demand sync modes that allow a user to call sync streams when a collection is defined (eager mode) or when a collection's data boundary changes based on the live queries predicates (on-demand). For the these examples we assuming the follow sync stream exists: ``` +config: + edition: 3 + streams: lists: query: SELECT * FROM lists WHERE owner_id = auth.user_id() auto_subscribe: true todos: query: SELECT * FROM todos WHERE list_id = subscription.parameter('list') AND list_id IN (SELECT id FROM lists WHERE owner_id = auth.user_id()) - -config: - edition: 2 ``` ### Example 1: Eager mode basic usage +If you want an eager collection to subscribe to a sync stream when a collection loads, you can use the `onLoad` hook. +The hook may return a cleanup function. + +Consider the diagram as an example. +We start with 4 todos in the PS service, only 2 todos get synced via the sync stream to the SQLite database. Because it's eager mode, both get synced from the SQLite database to the collection. Finally the TanstackDB query only returns the single todo that matches the live query predicate. + ```typescript const collection = createCollection( powerSyncCollectionOptions({ @@ -25,7 +35,7 @@ const collection = createCollection( onLoad: async () => { console.log('onLoad') const subscription = await db - .syncStream('todos', { list: '368b41f1-72fd-4a81-92ad-190711d72435' }) + .syncStream('todos', { list: 'list_1' }) .subscribe({ ttl: 0 }) await subscription.waitForFirstSync() @@ -39,8 +49,29 @@ const collection = createCollection( ) ``` +A live query that filters by completed. + +```typescript +const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ todo: collection }) + .where(({ todo }) => eq(todo.completed, 1)) + .select(({ todo }) => ({ + id: todo.id, + completed: todo.completed, + })), +}) +``` + ### Example 2: On-demand basic usage +If you want to on-demand collection to subscribe to a sync stream whenever a subset of data is loaded (when the list of live queries against the collection change), you can use the `onLoadSubset` hook. +The hook may return a cleanup function. + +Consider the diagram as an example. +We start with 4 todos in the PS service, only 2 todos get synced via the sync stream to the SQLite database. Because it's on-demand mode, only 1 todo matches gets synced from the SQLite database to the collection. Finally the TanstackDB query only returns the single todo that matches the live query predicate. + ```typescript const collection = createCollection( powerSyncCollectionOptions({ @@ -50,7 +81,7 @@ const collection = createCollection( onLoadSubset: async (options) => { console.log('onLoadSubset') const subscription = await db - .syncStream('todos', { list: '368b41f1-72fd-4a81-92ad-190711d72435' }) + .syncStream('todos', { list: 'list_1' }) .subscribe({ ttl: 0 }) await subscription.waitForFirstSync() @@ -64,6 +95,21 @@ const collection = createCollection( ) ``` +A live query that filters by completed. + +```typescript +const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ todo: collection }) + .where(({ todo }) => eq(todo.completed, 1)) + .select(({ todo }) => ({ + id: todo.id, + completed: todo.completed, + })), +}) +``` + ### Example 3: Extract a single filter value using `extractSimpleComparisons` Given a live query like: @@ -72,9 +118,12 @@ Given a live query like: .where(({ todo }) => eq(todo.list_id, selectedListId)) ``` -`onLoadSubset` receives options.where as an expression tree for eq(list_id, ''). +`onLoadSubset` receives options.where as an expression tree `for eq(list_id, '')`. We parse it to get the `list_id` value and pass it to `syncStream`. +Consider the diagram as an example. Note it differs from example 1 and 2 as it aims to illustrate `extractSimpleComparisons`. +We start with 4 todos in the PS service, the sync stream subscription criteria (`list_id = "list_1"`) is derived from the live query registered against the collection. Only 2 todos get synced via the sync stream to the SQLite database. Two todos get synced from the SQLite database to the collection. Finally the TanstackDB query returns both todos as they both match `eq(todo.list_id, 'list_id')`. + #### Collection ```typescript @@ -125,11 +174,10 @@ const liveQuery = createLiveQueryCollection({ q .from({ todo: collection }) .where( - ({ todo }) => eq(todo.list_id, '368b41f1-72fd-4a81-92ad-190711d72435'), // or some listId variable + ({ todo }) => eq(todo.list_id, 'list_id'), // or some listId variable ) .select(({ todo }) => ({ id: todo.id, - description: todo.description, completed: todo.completed, })), }) @@ -149,6 +197,9 @@ todos: Note: We keep the `list` parameter name as is (consistent with most of our examples), but to correctly work with the following example we need to map it to `list_id`. You may opt to name it as `list_id` in the sync stream definition and skip the mapping process. +Consider the diagram as an example. +We start with 4 todos in the PS service, the sync stream subscription criteria (`list_id = "list_1" and completed = 1`) is derived from the live query registered against the collection. Only 1 todo gets synced via the sync stream to the SQLite database. One todos gets synced from the SQLite database to the collection. Finally the TanstackDB query returns 1 todo that matches `eq(todo.list_id, 'list_id') and eq(todo.completed, 1)`. + #### Collection ```typescript @@ -210,14 +261,11 @@ const liveQuery = createLiveQueryCollection({ q .from({ todo: collection }) .where(({ todo }) => - and( - eq(todo.list_id, '368b41f1-72fd-4a81-92ad-190711d72435'), - eq(todo.completed, 1), - ), + and(eq(todo.list_id, 'list_1'), eq(todo.completed, 1)), ) .select(({ todo }) => ({ id: todo.id, - description: todo.description, + completed: todo.completed, })), }) ``` From d9aa57f4d13d0c078cd561c5ba0be87abc8a011f Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 10 Mar 2026 12:44:46 +0200 Subject: [PATCH 11/12] Wording/formatting. --- packages/powersync-db-collection/sync-stream-examples.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/powersync-db-collection/sync-stream-examples.md b/packages/powersync-db-collection/sync-stream-examples.md index 003b34d51..726d42092 100644 --- a/packages/powersync-db-collection/sync-stream-examples.md +++ b/packages/powersync-db-collection/sync-stream-examples.md @@ -4,7 +4,7 @@ Ideally we would be able to map TanstackDB queries to sync streams automatically the sqlite database from the service we have smaller set of data that needs to be considered when syncing from the sqlite database to TanstackDB collections. As a stepping stone towards that, we now expose data loading hooks for both eager and on-demand sync modes that allow a user to call sync streams when a collection is defined (eager mode) or when a collection's data boundary changes based on the live queries predicates (on-demand). -For the these examples we assuming the follow sync stream exists: +For the these examples we are assuming the follow sync stream exists: ``` config: @@ -31,7 +31,7 @@ const collection = createCollection( powerSyncCollectionOptions({ database: db, table: AppSchema.props.todos, - syncMode: `eager`, + syncMode: 'eager', onLoad: async () => { console.log('onLoad') const subscription = await db @@ -77,7 +77,7 @@ const collection = createCollection( powerSyncCollectionOptions({ database: db, table: AppSchema.props.todos, - syncMode: `on-demand`, + syncMode: 'on-demand', onLoadSubset: async (options) => { console.log('onLoadSubset') const subscription = await db From f5f5f719ed7123f437f7c23b8f2c639c6c10274d Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 10 Mar 2026 15:15:55 +0200 Subject: [PATCH 12/12] Removed examples doc. --- .../sync-stream-examples.md | 271 ------------------ 1 file changed, 271 deletions(-) delete mode 100644 packages/powersync-db-collection/sync-stream-examples.md diff --git a/packages/powersync-db-collection/sync-stream-examples.md b/packages/powersync-db-collection/sync-stream-examples.md deleted file mode 100644 index 726d42092..000000000 --- a/packages/powersync-db-collection/sync-stream-examples.md +++ /dev/null @@ -1,271 +0,0 @@ -## Incorporating Sync Streams - -Ideally we would be able to map TanstackDB queries to sync streams automatically, if we can optimise the amount of data sync to -the sqlite database from the service we have smaller set of data that needs to be considered when syncing from the sqlite database to TanstackDB collections. - -As a stepping stone towards that, we now expose data loading hooks for both eager and on-demand sync modes that allow a user to call sync streams when a collection is defined (eager mode) or when a collection's data boundary changes based on the live queries predicates (on-demand). -For the these examples we are assuming the follow sync stream exists: - -``` -config: - edition: 3 - -streams: - lists: - query: SELECT * FROM lists WHERE owner_id = auth.user_id() - auto_subscribe: true - todos: - query: SELECT * FROM todos WHERE list_id = subscription.parameter('list') AND list_id IN (SELECT id FROM lists WHERE owner_id = auth.user_id()) -``` - -### Example 1: Eager mode basic usage - -If you want an eager collection to subscribe to a sync stream when a collection loads, you can use the `onLoad` hook. -The hook may return a cleanup function. - -Consider the diagram as an example. -We start with 4 todos in the PS service, only 2 todos get synced via the sync stream to the SQLite database. Because it's eager mode, both get synced from the SQLite database to the collection. Finally the TanstackDB query only returns the single todo that matches the live query predicate. - -```typescript -const collection = createCollection( - powerSyncCollectionOptions({ - database: db, - table: AppSchema.props.todos, - syncMode: 'eager', - onLoad: async () => { - console.log('onLoad') - const subscription = await db - .syncStream('todos', { list: 'list_1' }) - .subscribe({ ttl: 0 }) - - await subscription.waitForFirstSync() - - return () => { - console.log('onUnload') - subscription.unsubscribe() - } - }, - }), -) -``` - -A live query that filters by completed. - -```typescript -const liveQuery = createLiveQueryCollection({ - query: (q) => - q - .from({ todo: collection }) - .where(({ todo }) => eq(todo.completed, 1)) - .select(({ todo }) => ({ - id: todo.id, - completed: todo.completed, - })), -}) -``` - -### Example 2: On-demand basic usage - -If you want to on-demand collection to subscribe to a sync stream whenever a subset of data is loaded (when the list of live queries against the collection change), you can use the `onLoadSubset` hook. -The hook may return a cleanup function. - -Consider the diagram as an example. -We start with 4 todos in the PS service, only 2 todos get synced via the sync stream to the SQLite database. Because it's on-demand mode, only 1 todo matches gets synced from the SQLite database to the collection. Finally the TanstackDB query only returns the single todo that matches the live query predicate. - -```typescript -const collection = createCollection( - powerSyncCollectionOptions({ - database: db, - table: AppSchema.props.todos, - syncMode: 'on-demand', - onLoadSubset: async (options) => { - console.log('onLoadSubset') - const subscription = await db - .syncStream('todos', { list: 'list_1' }) - .subscribe({ ttl: 0 }) - - await subscription.waitForFirstSync() - - return () => { - console.log('onUnloadSubset') - subscription.unsubscribe() - } - }, - }), -) -``` - -A live query that filters by completed. - -```typescript -const liveQuery = createLiveQueryCollection({ - query: (q) => - q - .from({ todo: collection }) - .where(({ todo }) => eq(todo.completed, 1)) - .select(({ todo }) => ({ - id: todo.id, - completed: todo.completed, - })), -}) -``` - -### Example 3: Extract a single filter value using `extractSimpleComparisons` - -Given a live query like: - -``` -.where(({ todo }) => eq(todo.list_id, selectedListId)) -``` - -`onLoadSubset` receives options.where as an expression tree `for eq(list_id, '')`. -We parse it to get the `list_id` value and pass it to `syncStream`. - -Consider the diagram as an example. Note it differs from example 1 and 2 as it aims to illustrate `extractSimpleComparisons`. -We start with 4 todos in the PS service, the sync stream subscription criteria (`list_id = "list_1"`) is derived from the live query registered against the collection. Only 2 todos get synced via the sync stream to the SQLite database. Two todos get synced from the SQLite database to the collection. Finally the TanstackDB query returns both todos as they both match `eq(todo.list_id, 'list_id')`. - -#### Collection - -```typescript -const collection = createCollection( - powerSyncCollectionOptions({ - database: db, - table: AppSchema.props.todos, - syncMode: 'on-demand', - onLoadSubset: async (options) => { - // Extract simple comparisons from the where expression - const comparisons = extractSimpleComparisons(options.where) - // comparisons = [{ field: ['todo', 'list_id'], operator: 'eq', value: '' }] - - // Find the list_id filter - const listIdFilter = comparisons.find( - (c) => c.field.includes('list_id') && c.operator === 'eq', - ) - - if (!listIdFilter) { - console.warn('No list_id filter found, skipping sync stream') - return - } - - console.log(`Subscribing to todos for list: ${listIdFilter.value}`) - - const subscription = await db - .syncStream('todos', { list: listIdFilter.value }) - .subscribe({ ttl: 0 }) - - await subscription.waitForFirstSync() - - return () => { - console.log(`Unsubscribing from todos for list: ${listIdFilter.value}`) - subscription.unsubscribe() - } - }, - }), -) -``` - -#### Live query - -Simple filter -> triggers `onLoadSubset` with `eq(list_id, '...')` - -```typescript -const liveQuery = createLiveQueryCollection({ - query: (q) => - q - .from({ todo: collection }) - .where( - ({ todo }) => eq(todo.list_id, 'list_id'), // or some listId variable - ) - .select(({ todo }) => ({ - id: todo.id, - completed: todo.completed, - })), -}) -``` - -### Example 4: Use `parseWhereExpression` with custom handlers - -`parseWhereExpression` gives you full control over how each operator is handled. -Here we build a params object for syncStream from the expression tree. - -Assume a small adjustment to the sync stream definition of todos (adding the `completed` subscription parameter) - -``` -todos: - query: SELECT * FROM todos WHERE list_id = subscription.parameter('list') AND completed = subscription.parameter("completed") AND list_id IN (SELECT id FROM lists WHERE owner_id = auth.user_id()) -``` - -Note: We keep the `list` parameter name as is (consistent with most of our examples), but to correctly work with the following example we need to map it to `list_id`. You may opt to name it as `list_id` in the sync stream definition and skip the mapping process. - -Consider the diagram as an example. -We start with 4 todos in the PS service, the sync stream subscription criteria (`list_id = "list_1" and completed = 1`) is derived from the live query registered against the collection. Only 1 todo gets synced via the sync stream to the SQLite database. One todos gets synced from the SQLite database to the collection. Finally the TanstackDB query returns 1 todo that matches `eq(todo.list_id, 'list_id') and eq(todo.completed, 1)`. - -#### Collection - -```typescript -const collection = createCollection( - powerSyncCollectionOptions({ - database: db, - table: AppSchema.props.todos, - syncMode: 'on-demand', - onLoadSubset: async (options) => { - // Parse the where into a flat params record using custom handlers - const streamParams = parseWhereExpression(options.where, { - handlers: { - eq: (field: Array, value: unknown) => { - const mappedField = mapFields(field[field.length - 1]!) - - return { - [mappedField]: value, - } - }, - and: (...filters: Array>) => - Object.assign({}, ...filters), - }, - onUnknownOperator: (op, _args) => { - console.warn(`Ignoring unsupported operator in stream params: ${op}`) - return {} - }, - }) - // For a query like: where(({ todo }) => and(eq(todo.list_id, 'abc'), eq(todo.completed, 0))) - // streamParams = { list: 'abc', completed: 0 } - - if (!streamParams || Object.keys(streamParams).length === 0) { - console.warn('No stream params extracted, skipping sync stream') - return - } - - console.log( - `Subscribing to todos with params: ${JSON.stringify(streamParams)}`, - ) - - const subscription = await db - .syncStream('todos', streamParams) - .subscribe({ ttl: 0 }) - - await subscription.waitForFirstSync() - - return () => subscription.unsubscribe() - }, - }), -) -``` - -#### Live Query - -Compound filter -> triggers `onLoadSubset` with `and(eq(list_id, '...'), eq(completed, 1))` - -```typescript -const liveQuery = createLiveQueryCollection({ - query: (q) => - q - .from({ todo: collection }) - .where(({ todo }) => - and(eq(todo.list_id, 'list_1'), eq(todo.completed, 1)), - ) - .select(({ todo }) => ({ - id: todo.id, - completed: todo.completed, - })), -}) -```