Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/remote/query-optimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class QueryOptimizer extends EventEmitter<EventMap> {

constructor(
private readonly manager: ConnectionManager,
private readonly connectable: Connectable,
private connectable: Connectable,
config?: {
maxRetries?: number;
queryTimeoutMs?: number;
Expand Down Expand Up @@ -136,6 +136,10 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
this.target = { optimizer, statistics };
}

updateConnectable(connectable: Connectable) {
this.connectable = connectable;
}

stop() {
this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY);
this.queries.clear();
Expand Down
17 changes: 5 additions & 12 deletions src/remote/remote-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import { RemoteController } from "./remote-controller.ts";
import { ConnectionManager } from "../sync/connection-manager.ts";
import { RemoteSyncRequest } from "./remote.dto.ts";

test("controller syncs correctly", async () => {
const [sourceDb, targetDb] = await Promise.all([
Expand Down Expand Up @@ -36,17 +35,15 @@
const remote = new RemoteController(innerRemote);

try {
const syncResult = await remote.onFullSync(
RemoteSyncRequest.encode({ db: source }),
);
const syncResult = await remote.onFullSync(source);

expect(syncResult.status).toEqual(200);

const pool = new Pool({
connectionString: target.withDatabaseName(Remote.optimizingDbName).toString(),
connectionString: target.withDatabaseName(Remote.defaultOptimizingDbPrefix).toString(),
});
const tablesAfter =
await pool.query("select tablename from pg_tables where schemaname = 'public'");

Check failure on line 46 in src/remote/remote-controller.test.ts

View workflow job for this annotation

GitHub Actions / release

src/remote/remote-controller.test.ts > controller syncs correctly

error: database "optimizing_db_0" does not exist ❯ node_modules/pg-pool/index.js:45:11 ❯ src/remote/remote-controller.test.ts:46:9 ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { length: 101, severity: 'FATAL', code: '3D000', detail: undefined, hint: undefined, position: undefined, internalPosition: undefined, internalQuery: undefined, where: undefined, schema: undefined, table: undefined, dataType: undefined, constraint: undefined, file: 'postinit.c', routine: 'InitPostgres' }
expect(tablesAfter.rowCount).toEqual(1);
const indexesAfter =
await pool.query("select * from pg_indexes where schemaname = 'public'");
Expand Down Expand Up @@ -89,18 +86,16 @@

try {
// First sync the database
const syncResult = await remote.onFullSync(
RemoteSyncRequest.encode({ db: source }),
);
const syncResult = await remote.onFullSync(source);
expect(syncResult.status).toEqual(200);

const pool = new Pool({
connectionString: target.withDatabaseName(Remote.optimizingDbName).toString(),
connectionString: target.withDatabaseName(Remote.defaultOptimizingDbPrefix).toString(),
});

// Verify no indexes exist initially
const indexesBefore =
await pool.query("select * from pg_indexes where schemaname = 'public'");

Check failure on line 98 in src/remote/remote-controller.test.ts

View workflow job for this annotation

GitHub Actions / release

src/remote/remote-controller.test.ts > creating an index via endpoint adds it to the optimizing db

error: database "optimizing_db_0" does not exist ❯ node_modules/pg-pool/index.js:45:11 ❯ src/remote/remote-controller.test.ts:98:9 ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { length: 101, severity: 'FATAL', code: '3D000', detail: undefined, hint: undefined, position: undefined, internalPosition: undefined, internalQuery: undefined, where: undefined, schema: undefined, table: undefined, dataType: undefined, constraint: undefined, file: 'postinit.c', routine: 'InitPostgres' }
expect(indexesBefore.rowCount).toEqual(0);

// Create an index via the controller method
Expand Down Expand Up @@ -152,9 +147,7 @@
const remote = new RemoteController(innerRemote);

try {
const syncResult = await remote.onFullSync(
RemoteSyncRequest.encode({ db: source }),
);
const syncResult = await remote.onFullSync(source);

expect(syncResult.status).toEqual(200);

Expand Down
23 changes: 15 additions & 8 deletions src/remote/remote-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "./remote-controller.dto.ts";
import { ZodError } from "zod";
import { ExportedStats, Statistics } from "@query-doctor/core";
import type { Connectable } from "../sync/connectable.ts";

const SyncStatus = {
NOT_STARTED: "notStarted",
Expand All @@ -35,6 +36,7 @@ export class RemoteController {
private socket?: WebSocket;
private syncResponse?: Awaited<ReturnType<Remote["syncFrom"]>>;
private syncStatus: SyncStatus = SyncStatus.NOT_STARTED;
private lastSourceDb?: Connectable;

constructor(
private readonly remote: Remote,
Expand Down Expand Up @@ -84,7 +86,7 @@ export class RemoteController {

// TODO: type return (Site#2402)
async getStatus(): Promise<unknown> {
if (!this.syncResponse || this.syncStatus !== SyncStatus.COMPLETED) {
if (!this.syncResponse) {
return { status: this.syncStatus };
}
const { schema, meta } = this.syncResponse;
Expand Down Expand Up @@ -117,13 +119,8 @@ export class RemoteController {
} as const;
}

async onFullSync(rawBody: string): Promise<HandlerResult> {
const body = RemoteSyncRequest.safeDecode(rawBody);
if (!body.success) {
return { status: 400, body: body.error };
}

const { db } = body.data;
async onFullSync(db: Connectable): Promise<HandlerResult> {
this.lastSourceDb = db;
try {
this.syncStatus = SyncStatus.IN_PROGRESS;
this.syncResponse = await this.remote.syncFrom(db, {
Expand Down Expand Up @@ -158,6 +155,16 @@ export class RemoteController {
}
}

async redump(): Promise<HandlerResult> {
if (!this.lastSourceDb) {
return {
status: 400,
body: { type: "error", error: "no_source_db", message: "No source database has been synced yet" },
};
}
return this.onFullSync(this.lastSourceDb);
}

async onImportStats(body: unknown): Promise<HandlerResult> {
let stats: ExportedStats[];
try {
Expand Down
2 changes: 1 addition & 1 deletion src/remote/remote.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@
expect(indexNames).toEqual(expect.arrayContaining(["testing_1234"]));

const pool = new Pool({
connectionString: target.withDatabaseName(Remote.optimizingDbName).toString(),
connectionString: target.withDatabaseName(Remote.defaultOptimizingDbPrefix).toString(),
});

const indexesAfter =
await pool.query("select indexname from pg_indexes where schemaname = 'public'");

Check failure on line 101 in src/remote/remote.test.ts

View workflow job for this annotation

GitHub Actions / release

src/remote/remote.test.ts > syncs correctly

error: database "optimizing_db_0" does not exist ❯ node_modules/pg-pool/index.js:45:11 ❯ src/remote/remote.test.ts:101:9 ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { length: 101, severity: 'FATAL', code: '3D000', detail: undefined, hint: undefined, position: undefined, internalPosition: undefined, internalQuery: undefined, where: undefined, schema: undefined, table: undefined, dataType: undefined, constraint: undefined, file: 'postinit.c', routine: 'InitPostgres' }
expect(indexesAfter.rowCount).toEqual(1);

expect(indexesAfter.rows[0]).toEqual({ indexname: "testing_1234" });
Expand Down
39 changes: 23 additions & 16 deletions src/remote/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ type RemoteEvents = {
*/
export class Remote extends EventEmitter<RemoteEvents> {
static readonly baseDbName = PgIdentifier.fromString("postgres");
static readonly optimizingDbName = PgIdentifier.fromString(
"optimizing_db",
);
private static readonly optimizingDbPrefix = "optimizing_db";
static defaultOptimizingDbPrefix = PgIdentifier.fromString(`${Remote.optimizingDbPrefix}_0`)

/* Threshold that we determine is "too few rows" for Postgres to start using indexes
* and not defaulting to table scan.
*/
Expand All @@ -55,8 +55,9 @@ export class Remote extends EventEmitter<RemoteEvents> {
* destroyed and re-created on each successful sync along with the db itself
*/
private baseDbURL: Connectable;
/** The URL of the optimizing db */
private readonly optimizingDbUDRL: Connectable;
private generation = 0;
/** The URL of the current generation optimizing db */
private optimizingDbUDRL: Connectable;

private isPolling = false;
private queryLoader?: QueryLoader;
Expand All @@ -74,7 +75,9 @@ export class Remote extends EventEmitter<RemoteEvents> {
) {
super();
this.baseDbURL = targetURL.withDatabaseName(Remote.baseDbName);
this.optimizingDbUDRL = targetURL.withDatabaseName(Remote.optimizingDbName);
this.optimizingDbUDRL = targetURL.withDatabaseName(
Remote.defaultOptimizingDbPrefix,
);
this.optimizer = new QueryOptimizer(manager, this.optimizingDbUDRL);
}

Expand Down Expand Up @@ -210,21 +213,25 @@ export class Remote extends EventEmitter<RemoteEvents> {
}
}

/**
* Drops and recreates the {@link Remote.optimizingDbName} db.
*
* TODO: allow juggling multiple databases in the future
*/
private async resetDatabase(): Promise<void> {
const databaseName = Remote.optimizingDbName;
log.info(`Resetting internal database: ${databaseName}`, "remote");
const prevGeneration = this.generation;
const nextGeneration = prevGeneration + 1;
const nextDbName = PgIdentifier.fromString(
`${Remote.optimizingDbPrefix}_${nextGeneration}`,
);
log.info(`Creating new generation database: ${nextDbName}`, "remote");
const baseDb = this.manager.getOrCreateConnection(this.baseDbURL);
await baseDb.exec(`create database ${nextDbName};`);
const prevDbName = PgIdentifier.fromString(
`${Remote.optimizingDbPrefix}_${prevGeneration}`,
);
this.generation = nextGeneration;
this.optimizingDbUDRL = this.optimizingDbUDRL.withDatabaseName(nextDbName);
this.optimizer.updateConnectable(this.optimizingDbUDRL);
// these cannot be run in the same `exec` block as that implicitly creates transactions
await baseDb.exec(
// drop database does not allow parameterization
`drop database if exists ${databaseName} with (force);`,
`drop database if exists ${prevDbName} with (force);`,
);
await baseDb.exec(`create database ${databaseName};`);
}

private async pipeSchema(
Expand Down
17 changes: 13 additions & 4 deletions src/server/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { SyncResult } from "../sync/syncer.ts";
import * as errors from "../sync/errors.ts";
import { RemoteController } from "../remote/remote-controller.ts";
import { Connectable } from "../sync/connectable.ts";
import { RemoteSyncRequest } from "../remote/remote.dto.ts";
import { ConnectionManager } from "../sync/connection-manager.ts";
import { Remote } from "../remote/remote.ts";

Expand Down Expand Up @@ -175,9 +176,11 @@ export async function createServer(
if (!sourceDb) {
fastify.post("/postgres", async (request, reply) => {
log.info(`[POST] /postgres`, "http");
const result = await remoteController.onFullSync(
JSON.stringify(request.body),
);
const body = RemoteSyncRequest.safeDecode(JSON.stringify(request.body));
if (!body.success) {
return reply.status(400).send(body.error);
}
const result = await remoteController.onFullSync(body.data.db);
return reply.status(result.status).send(result.body);
});
}
Expand Down Expand Up @@ -223,13 +226,19 @@ export async function createServer(
const result = await remoteController.onImportStats(request.body);
return reply.status(result.status).send(result.body);
});

fastify.post("/postgres/dump", async (request, reply) => {
log.info(`[POST] /postgres/dump`, "http");
const result = await remoteController.redump();
return reply.status(result.status).send(result.body);
});
}

await fastify.listen({ host: hostname, port });

if (remoteController && sourceDb) {
log.info(`SOURCE_DATABASE_URL set, triggering initial sync`, "http");
remoteController.onFullSync(JSON.stringify({ db: sourceDb.toString() })).then((result) => {
remoteController.onFullSync(sourceDb).then((result) => {
if (result.status >= 400) {
log.error(`Initial sync failed: ${JSON.stringify(result.body)}`, "http");
process.exit(1);
Expand Down
2 changes: 2 additions & 0 deletions src/sync/pg-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ ORDER BY
-- excluding this makes sure we can use analyzer
-- in multi-tenant environments
and query != '<insufficient privilege>'
and query not ilike 'explain%'
-- and pg_user.usename not in (/* supabase */ 'supabase_admin', 'supabase_auth_admin', /* neon */ 'cloud_admin'); -- @qd_introspection
`); // we're excluding `pg_stat_statements` from the results since it's almost certainly unrelated

Expand All @@ -536,6 +537,7 @@ ORDER BY
FROM ${source.schema}.pg_stat_monitor
WHERE query not like '%pg_stat_monitor%'
and query not like '%@qd_introspection%'
and query not ilike 'explain%'
`); // we're excluding `pg_stat_monitor` from the results since it's almost certainly unrelated

return await this.segmentedQueryCache.sync(
Expand Down
Loading