diff --git a/src/remote/query-optimizer.ts b/src/remote/query-optimizer.ts index daabacf..bf44e86 100644 --- a/src/remote/query-optimizer.ts +++ b/src/remote/query-optimizer.ts @@ -66,7 +66,7 @@ export class QueryOptimizer extends EventEmitter { constructor( private readonly manager: ConnectionManager, - private readonly connectable: Connectable, + private connectable: Connectable, config?: { maxRetries?: number; queryTimeoutMs?: number; @@ -136,6 +136,10 @@ export class QueryOptimizer extends EventEmitter { this.target = { optimizer, statistics }; } + updateConnectable(connectable: Connectable) { + this.connectable = connectable; + } + stop() { this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY); this.queries.clear(); diff --git a/src/remote/remote-controller.test.ts b/src/remote/remote-controller.test.ts index cbf6c22..13747a6 100644 --- a/src/remote/remote-controller.test.ts +++ b/src/remote/remote-controller.test.ts @@ -6,7 +6,6 @@ import { Pool } from "pg"; import { RemoteController } from "./remote-controller.ts"; import { ConnectionManager } from "../sync/connection-manager.ts"; -import { RemoteSyncRequest } from "./remote.dto.ts"; test("controller syncs correctly", async () => { const [sourceDb, targetDb] = await Promise.all([ @@ -36,14 +35,12 @@ test("controller syncs correctly", async () => { const remote = new RemoteController(innerRemote); try { - const syncResult = await remote.onFullSync( - RemoteSyncRequest.encode({ db: source }), - ); + const syncResult = await remote.onFullSync(source); expect(syncResult.status).toEqual(200); const pool = new Pool({ - connectionString: target.withDatabaseName(Remote.optimizingDbName).toString(), + connectionString: target.withDatabaseName(Remote.defaultOptimizingDbPrefix).toString(), }); const tablesAfter = await pool.query("select tablename from pg_tables where schemaname = 'public'"); @@ -89,13 +86,11 @@ test("creating an index via endpoint adds it to the optimizing db", async () => try { // First sync the database - const syncResult = await remote.onFullSync( - RemoteSyncRequest.encode({ db: source }), - ); + const syncResult = await remote.onFullSync(source); expect(syncResult.status).toEqual(200); const pool = new Pool({ - connectionString: target.withDatabaseName(Remote.optimizingDbName).toString(), + connectionString: target.withDatabaseName(Remote.defaultOptimizingDbPrefix).toString(), }); // Verify no indexes exist initially @@ -152,9 +147,7 @@ test("controller returns extension error when pg_stat_statements is not installe const remote = new RemoteController(innerRemote); try { - const syncResult = await remote.onFullSync( - RemoteSyncRequest.encode({ db: source }), - ); + const syncResult = await remote.onFullSync(source); expect(syncResult.status).toEqual(200); diff --git a/src/remote/remote-controller.ts b/src/remote/remote-controller.ts index 6317827..44a7689 100644 --- a/src/remote/remote-controller.ts +++ b/src/remote/remote-controller.ts @@ -11,6 +11,7 @@ import { } from "./remote-controller.dto.ts"; import { ZodError } from "zod"; import { ExportedStats, Statistics } from "@query-doctor/core"; +import type { Connectable } from "../sync/connectable.ts"; const SyncStatus = { NOT_STARTED: "notStarted", @@ -35,6 +36,7 @@ export class RemoteController { private socket?: WebSocket; private syncResponse?: Awaited>; private syncStatus: SyncStatus = SyncStatus.NOT_STARTED; + private lastSourceDb?: Connectable; constructor( private readonly remote: Remote, @@ -84,7 +86,7 @@ export class RemoteController { // TODO: type return (Site#2402) async getStatus(): Promise { - if (!this.syncResponse || this.syncStatus !== SyncStatus.COMPLETED) { + if (!this.syncResponse) { return { status: this.syncStatus }; } const { schema, meta } = this.syncResponse; @@ -117,13 +119,8 @@ export class RemoteController { } as const; } - async onFullSync(rawBody: string): Promise { - const body = RemoteSyncRequest.safeDecode(rawBody); - if (!body.success) { - return { status: 400, body: body.error }; - } - - const { db } = body.data; + async onFullSync(db: Connectable): Promise { + this.lastSourceDb = db; try { this.syncStatus = SyncStatus.IN_PROGRESS; this.syncResponse = await this.remote.syncFrom(db, { @@ -158,6 +155,16 @@ export class RemoteController { } } + async redump(): Promise { + if (!this.lastSourceDb) { + return { + status: 400, + body: { type: "error", error: "no_source_db", message: "No source database has been synced yet" }, + }; + } + return this.onFullSync(this.lastSourceDb); + } + async onImportStats(body: unknown): Promise { let stats: ExportedStats[]; try { diff --git a/src/remote/remote.test.ts b/src/remote/remote.test.ts index f72336d..53f3496 100644 --- a/src/remote/remote.test.ts +++ b/src/remote/remote.test.ts @@ -94,7 +94,7 @@ test("syncs correctly", async () => { expect(indexNames).toEqual(expect.arrayContaining(["testing_1234"])); const pool = new Pool({ - connectionString: target.withDatabaseName(Remote.optimizingDbName).toString(), + connectionString: target.withDatabaseName(Remote.defaultOptimizingDbPrefix).toString(), }); const indexesAfter = diff --git a/src/remote/remote.ts b/src/remote/remote.ts index 69892ba..b155d79 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -35,9 +35,9 @@ type RemoteEvents = { */ export class Remote extends EventEmitter { static readonly baseDbName = PgIdentifier.fromString("postgres"); - static readonly optimizingDbName = PgIdentifier.fromString( - "optimizing_db", - ); + private static readonly optimizingDbPrefix = "optimizing_db"; + static defaultOptimizingDbPrefix = PgIdentifier.fromString(`${Remote.optimizingDbPrefix}_0`) + /* Threshold that we determine is "too few rows" for Postgres to start using indexes * and not defaulting to table scan. */ @@ -55,8 +55,9 @@ export class Remote extends EventEmitter { * destroyed and re-created on each successful sync along with the db itself */ private baseDbURL: Connectable; - /** The URL of the optimizing db */ - private readonly optimizingDbUDRL: Connectable; + private generation = 0; + /** The URL of the current generation optimizing db */ + private optimizingDbUDRL: Connectable; private isPolling = false; private queryLoader?: QueryLoader; @@ -74,7 +75,9 @@ export class Remote extends EventEmitter { ) { super(); this.baseDbURL = targetURL.withDatabaseName(Remote.baseDbName); - this.optimizingDbUDRL = targetURL.withDatabaseName(Remote.optimizingDbName); + this.optimizingDbUDRL = targetURL.withDatabaseName( + Remote.defaultOptimizingDbPrefix, + ); this.optimizer = new QueryOptimizer(manager, this.optimizingDbUDRL); } @@ -210,21 +213,25 @@ export class Remote extends EventEmitter { } } - /** - * Drops and recreates the {@link Remote.optimizingDbName} db. - * - * TODO: allow juggling multiple databases in the future - */ private async resetDatabase(): Promise { - const databaseName = Remote.optimizingDbName; - log.info(`Resetting internal database: ${databaseName}`, "remote"); + const prevGeneration = this.generation; + const nextGeneration = prevGeneration + 1; + const nextDbName = PgIdentifier.fromString( + `${Remote.optimizingDbPrefix}_${nextGeneration}`, + ); + log.info(`Creating new generation database: ${nextDbName}`, "remote"); const baseDb = this.manager.getOrCreateConnection(this.baseDbURL); + await baseDb.exec(`create database ${nextDbName};`); + const prevDbName = PgIdentifier.fromString( + `${Remote.optimizingDbPrefix}_${prevGeneration}`, + ); + this.generation = nextGeneration; + this.optimizingDbUDRL = this.optimizingDbUDRL.withDatabaseName(nextDbName); + this.optimizer.updateConnectable(this.optimizingDbUDRL); // these cannot be run in the same `exec` block as that implicitly creates transactions await baseDb.exec( - // drop database does not allow parameterization - `drop database if exists ${databaseName} with (force);`, + `drop database if exists ${prevDbName} with (force);`, ); - await baseDb.exec(`create database ${databaseName};`); } private async pipeSchema( diff --git a/src/server/http.ts b/src/server/http.ts index 6808db6..490c94d 100644 --- a/src/server/http.ts +++ b/src/server/http.ts @@ -12,6 +12,7 @@ import { SyncResult } from "../sync/syncer.ts"; import * as errors from "../sync/errors.ts"; import { RemoteController } from "../remote/remote-controller.ts"; import { Connectable } from "../sync/connectable.ts"; +import { RemoteSyncRequest } from "../remote/remote.dto.ts"; import { ConnectionManager } from "../sync/connection-manager.ts"; import { Remote } from "../remote/remote.ts"; @@ -175,9 +176,11 @@ export async function createServer( if (!sourceDb) { fastify.post("/postgres", async (request, reply) => { log.info(`[POST] /postgres`, "http"); - const result = await remoteController.onFullSync( - JSON.stringify(request.body), - ); + const body = RemoteSyncRequest.safeDecode(JSON.stringify(request.body)); + if (!body.success) { + return reply.status(400).send(body.error); + } + const result = await remoteController.onFullSync(body.data.db); return reply.status(result.status).send(result.body); }); } @@ -223,13 +226,19 @@ export async function createServer( const result = await remoteController.onImportStats(request.body); return reply.status(result.status).send(result.body); }); + + fastify.post("/postgres/dump", async (request, reply) => { + log.info(`[POST] /postgres/dump`, "http"); + const result = await remoteController.redump(); + return reply.status(result.status).send(result.body); + }); } await fastify.listen({ host: hostname, port }); if (remoteController && sourceDb) { log.info(`SOURCE_DATABASE_URL set, triggering initial sync`, "http"); - remoteController.onFullSync(JSON.stringify({ db: sourceDb.toString() })).then((result) => { + remoteController.onFullSync(sourceDb).then((result) => { if (result.status >= 400) { log.error(`Initial sync failed: ${JSON.stringify(result.body)}`, "http"); process.exit(1); diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index 67aa37c..19f85c8 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -517,6 +517,7 @@ ORDER BY -- excluding this makes sure we can use analyzer -- in multi-tenant environments and query != '' + and query not ilike 'explain%' -- and pg_user.usename not in (/* supabase */ 'supabase_admin', 'supabase_auth_admin', /* neon */ 'cloud_admin'); -- @qd_introspection `); // we're excluding `pg_stat_statements` from the results since it's almost certainly unrelated @@ -536,6 +537,7 @@ ORDER BY FROM ${source.schema}.pg_stat_monitor WHERE query not like '%pg_stat_monitor%' and query not like '%@qd_introspection%' + and query not ilike 'explain%' `); // we're excluding `pg_stat_monitor` from the results since it's almost certainly unrelated return await this.segmentedQueryCache.sync(