From 91bbed86f303a99ba20f7bb23fad641ba0c38daa Mon Sep 17 00:00:00 2001 From: Jean-Philippe Sirois Date: Fri, 16 Jan 2026 16:59:29 +0400 Subject: [PATCH 1/4] feat: infer stats strategy from source sum of rows --- src/remote/remote.ts | 74 +++++++++++++++++++++++++++++++--------- src/sync/pg-connector.ts | 22 ++++++++++++ 2 files changed, 80 insertions(+), 16 deletions(-) diff --git a/src/remote/remote.ts b/src/remote/remote.ts index da9026a..ad0f682 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -34,6 +34,10 @@ export class Remote extends EventEmitter { static readonly optimizingDbName = PgIdentifier.fromString( "optimizing_db", ); + /* Threshold that we determine is "too few rows" for Postgres to start using indexes + * and not defaulting to table scan. + */ + private static readonly STATS_ROWS_THRESHOLD = 5_000; private readonly differ = new SchemaDiffer(); readonly optimizer: QueryOptimizer; @@ -69,14 +73,18 @@ export class Remote extends EventEmitter { source: Connectable, statsStrategy: StatisticsStrategy = { type: "pullFromSource" }, ): Promise< - { meta: { version?: string }; schema: RemoteSyncFullSchemaResponse } + { + meta: { version?: string; inferredStatsStrategy?: InferredStatsStrategy }; + schema: RemoteSyncFullSchemaResponse; + } > { await this.resetDatabase(); + + // First batch: get schema and other info in parallel (needed for stats decision) const [ restoreResult, recentQueries, fullSchema, - pulledStats, databaseInfo, ] = await Promise .allSettled([ @@ -84,7 +92,6 @@ export class Remote extends EventEmitter { this.pipeSchema(this.optimizingDbUDRL, source), this.getRecentQueries(source), this.getFullSchema(source), - this.resolveStatistics(source, statsStrategy), this.getDatabaseInfo(source), ]); @@ -92,6 +99,16 @@ export class Remote extends EventEmitter { this.differ.put(source, fullSchema.value); } + // Second: resolve stats strategy using table list from schema + const tables = fullSchema.status === "fulfilled" + ? fullSchema.value.tables + : []; + const statsResult = await this.resolveStatistics( + source, + statsStrategy, + tables, + ); + const pg = this.manager.getOrCreateConnection( this.optimizingDbUDRL, ); @@ -101,16 +118,11 @@ export class Remote extends EventEmitter { queries = recentQueries.value; } - let stats: StatisticsMode | undefined; - if (pulledStats.status === "fulfilled") { - stats = pulledStats.value; - } - await this.onSuccessfulSync( pg, source, queries, - stats, + statsResult.mode, ); return { @@ -118,6 +130,7 @@ export class Remote extends EventEmitter { version: databaseInfo.status === "fulfilled" ? databaseInfo.value.serverVersion : undefined, + inferredStatsStrategy: statsResult.strategy, }, schema: fullSchema.status === "fulfilled" ? { type: "ok", value: fullSchema.value } @@ -176,16 +189,38 @@ export class Remote extends EventEmitter { } } - private resolveStatistics( + private async resolveStatistics( source: Connectable, strategy: StatisticsStrategy, - ): Promise { - switch (strategy.type) { - case "static": - return Promise.resolve(strategy.stats); - case "pullFromSource": - return this.dumpSourceStats(source); + tables: { schemaName: string; tableName: string }[], + ): Promise { + if (strategy.type === "static") { + // Static strategy doesn't go through inference + return { mode: strategy.stats, strategy: "fromSource" }; + } + return this.decideStatsStrategy(source, tables); + } + + private async decideStatsStrategy( + source: Connectable, + tables: { schemaName: string; tableName: string }[], + ): Promise { + const connector = this.sourceManager.getConnectorFor(source); + const totalRows = await connector.getTotalRowCount(tables); + + if (totalRows < Remote.STATS_ROWS_THRESHOLD) { + log.info( + `Total rows (${totalRows}) below threshold, using default 10k stats`, + "remote", + ); + return { mode: Statistics.defaultStatsMode, strategy: "10k" }; } + + log.info( + `Total rows (${totalRows}) above threshold, pulling source stats`, + "remote", + ); + return { mode: await this.dumpSourceStats(source), strategy: "fromSource" }; } private async dumpSourceStats(source: Connectable): Promise { @@ -258,3 +293,10 @@ export type StatisticsStrategy = { type: "static"; stats: StatisticsMode; }; + +export type InferredStatsStrategy = "10k" | "fromSource"; + +type StatsResult = { + mode: StatisticsMode; + strategy: InferredStatsStrategy; +}; diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index 6c7c4a4..ba24aac 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -413,6 +413,28 @@ ORDER BY return FullSchema.parse(results.result); } + public async getTotalRowCount( + tables: { schemaName: string; tableName: string }[], + ): Promise { + if (tables.length === 0) return 0; + + // Strip surrounding quotes from identifiers (they come pre-quoted from schema dump) + const stripQuotes = (s: string) => s.replace(/^"|"$/g, ""); + const schemaNames = tables.map((t) => stripQuotes(t.schemaName)); + const tableNames = tables.map((t) => stripQuotes(t.tableName)); + + const results = await this.db.exec<{ total_rows: string }>( + `SELECT COALESCE(SUM(c.reltuples), 0)::bigint as total_rows + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN unnest($1::text[], $2::text[]) AS t(schema_name, table_name) + ON n.nspname = t.schema_name AND c.relname = t.table_name + WHERE c.relkind = 'r'`, + [schemaNames, tableNames], + ); + return Number(results[0]?.total_rows ?? 0); + } + public async getDatabaseInfo() { const results = await this.db.exec<{ serverVersion: string; From 6a3abd03b71f411844e8bf019cdb6facde05f926 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Sirois Date: Fri, 16 Jan 2026 17:13:29 +0400 Subject: [PATCH 2/4] tests: add tests for stats inferring --- src/remote/remote.test.ts | 94 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/src/remote/remote.test.ts b/src/remote/remote.test.ts index 293fa2f..3f9d56a 100644 --- a/src/remote/remote.test.ts +++ b/src/remote/remote.test.ts @@ -247,6 +247,100 @@ Deno.test({ }, }); +Deno.test({ + name: "infers '10k' stats strategy when row count is below threshold", + sanitizeOps: false, + sanitizeResources: false, + fn: async () => { + // Create source with very few rows (below 5000 threshold) + const [sourceDb, targetDb] = await Promise.all([ + new PostgreSqlContainer("postgres:17") + .withCopyContentToContainer([ + { + content: ` + create extension pg_stat_statements; + create table small_table(id int); + insert into small_table select generate_series(1, 100); + analyze small_table; + `, + target: "/docker-entrypoint-initdb.d/init.sql", + }, + ]) + .withCommand(["-c", "shared_preload_libraries=pg_stat_statements"]) + .start(), + testSpawnTarget(), + ]); + + try { + const target = Connectable.fromString(targetDb.getConnectionUri()); + const source = Connectable.fromString(sourceDb.getConnectionUri()); + + const remote = new Remote( + target, + ConnectionManager.forLocalDatabase(), + ); + + const result = await remote.syncFrom(source); + await remote.optimizer.finish; + + assertEquals( + result.meta.inferredStatsStrategy, + "10k", + "Should infer '10k' strategy for small databases", + ); + } finally { + await Promise.all([sourceDb.stop(), targetDb.stop()]); + } + }, +}); + +Deno.test({ + name: "infers 'fromSource' stats strategy when row count is above threshold", + sanitizeOps: false, + sanitizeResources: false, + fn: async () => { + // Create source with many rows (above 5000 threshold) + const [sourceDb, targetDb] = await Promise.all([ + new PostgreSqlContainer("postgres:17") + .withCopyContentToContainer([ + { + content: ` + create extension pg_stat_statements; + create table large_table(id int); + insert into large_table select generate_series(1, 10000); + analyze large_table; + `, + target: "/docker-entrypoint-initdb.d/init.sql", + }, + ]) + .withCommand(["-c", "shared_preload_libraries=pg_stat_statements"]) + .start(), + testSpawnTarget(), + ]); + + try { + const target = Connectable.fromString(targetDb.getConnectionUri()); + const source = Connectable.fromString(sourceDb.getConnectionUri()); + + const remote = new Remote( + target, + ConnectionManager.forLocalDatabase(), + ); + + const result = await remote.syncFrom(source); + await remote.optimizer.finish; + + assertEquals( + result.meta.inferredStatsStrategy, + "fromSource", + "Should infer 'fromSource' strategy for large databases", + ); + } finally { + await Promise.all([sourceDb.stop(), targetDb.stop()]); + } + }, +}); + Deno.test({ name: "timescaledb with continuous aggregates sync correctly", sanitizeOps: false, From 6e665dd8688898e86026e99fba8bd3f4e3785d80 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Sirois Date: Fri, 16 Jan 2026 18:42:04 +0400 Subject: [PATCH 3/4] feat: prefer PgIdentifier early on instead of removing quotes --- src/remote/remote.ts | 4 ++-- src/sync/pg-connector.ts | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/remote/remote.ts b/src/remote/remote.ts index ad0f682..d3a1160 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -192,7 +192,7 @@ export class Remote extends EventEmitter { private async resolveStatistics( source: Connectable, strategy: StatisticsStrategy, - tables: { schemaName: string; tableName: string }[], + tables: { schemaName: PgIdentifier; tableName: PgIdentifier }[], ): Promise { if (strategy.type === "static") { // Static strategy doesn't go through inference @@ -203,7 +203,7 @@ export class Remote extends EventEmitter { private async decideStatsStrategy( source: Connectable, - tables: { schemaName: string; tableName: string }[], + tables: { schemaName: PgIdentifier; tableName: PgIdentifier }[], ): Promise { const connector = this.sourceManager.getConnectorFor(source); const totalRows = await connector.getTotalRowCount(tables); diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index ba24aac..d9d8e53 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -414,14 +414,12 @@ ORDER BY } public async getTotalRowCount( - tables: { schemaName: string; tableName: string }[], + tables: { schemaName: PgIdentifier; tableName: PgIdentifier }[], ): Promise { if (tables.length === 0) return 0; - // Strip surrounding quotes from identifiers (they come pre-quoted from schema dump) - const stripQuotes = (s: string) => s.replace(/^"|"$/g, ""); - const schemaNames = tables.map((t) => stripQuotes(t.schemaName)); - const tableNames = tables.map((t) => stripQuotes(t.tableName)); + const schemaNames = tables.map((t) => t.schemaName.toString()); + const tableNames = tables.map((t) => t.tableName.toString()); const results = await this.db.exec<{ total_rows: string }>( `SELECT COALESCE(SUM(c.reltuples), 0)::bigint as total_rows From 521deea79601a963efca11905775437d701502e1 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Sirois Date: Fri, 16 Jan 2026 18:42:23 +0400 Subject: [PATCH 4/4] feat: include materalized views in rows count --- src/sync/pg-connector.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index d9d8e53..97d5516 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -427,7 +427,7 @@ ORDER BY JOIN pg_namespace n ON n.oid = c.relnamespace JOIN unnest($1::text[], $2::text[]) AS t(schema_name, table_name) ON n.nspname = t.schema_name AND c.relname = t.table_name - WHERE c.relkind = 'r'`, + WHERE c.relkind IN ('r', 'm')`, [schemaNames, tableNames], ); return Number(results[0]?.total_rows ?? 0);