diff --git a/src/cli.ts b/src/cli.ts index 891edbf..d9156d6 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -8,6 +8,7 @@ import { EnableMirroringCommand } from "./commands/enable-mirroring.js"; import { EnableRetiredMirroringCommand } from "./commands/enable-retired-mirroring.js"; import { FillCommand } from "./commands/fill.js"; import { PrepCommand } from "./commands/prep.js"; +import { StatusCommand } from "./commands/status.js"; import { SwapCommand } from "./commands/swap.js"; import { SynchronizeCommand } from "./commands/synchronize.js"; import { UnprepCommand } from "./commands/unprep.js"; @@ -30,6 +31,7 @@ export function createCli(): Cli { cli.register(EnableRetiredMirroringCommand); cli.register(FillCommand); cli.register(PrepCommand); + cli.register(StatusCommand); cli.register(SwapCommand); cli.register(SynchronizeCommand); cli.register(UnprepCommand); diff --git a/src/commands/status.test.ts b/src/commands/status.test.ts new file mode 100644 index 0000000..47dd460 --- /dev/null +++ b/src/commands/status.test.ts @@ -0,0 +1,45 @@ +import { describe, expect } from "vitest"; +import { sql } from "slonik"; + +import { commandTest as test } from "../testing/index.js"; +import { StatusCommand } from "./status.js"; + +describe("StatusCommand", () => { + test.scoped({ commandClass: ({}, use) => use(StatusCommand) }); + + test.beforeEach(async ({ transaction }) => { + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id SERIAL PRIMARY KEY, + created_at DATE NOT NULL + ) + `); + }); + + test("outputs status as JSON", async ({ cli, commandContext }) => { + const exitCode = await cli.run( + ["status", "posts", "--json"], + commandContext, + ); + + expect(exitCode).toBe(0); + const status = JSON.parse(commandContext.stdout.read()?.toString() ?? ""); + expect(status).toEqual({ + intermediateExists: false, + partitionCount: 0, + mirrorTriggerExists: false, + retiredMirrorTriggerExists: false, + originalIsPartitioned: false, + }); + }); + + test("outputs human-readable status", async ({ cli, commandContext }) => { + const exitCode = await cli.run(["status", "posts"], commandContext); + + expect(exitCode).toBe(0); + const output = commandContext.stdout.read()?.toString(); + expect(output).toContain("Table: posts"); + expect(output).toContain("Intermediate exists:"); + expect(output).toContain("Original is partitioned:"); + }); +}); diff --git a/src/commands/status.ts b/src/commands/status.ts new file mode 100644 index 0000000..1fd0162 --- /dev/null +++ b/src/commands/status.ts @@ -0,0 +1,53 @@ +import { Command, Option } from "clipanion"; + +import { BaseCommand } from "./base.js"; +import { type Pgslice } from "../pgslice.js"; + +export class StatusCommand extends BaseCommand { + static override paths = [["status"]]; + + static override usage = Command.Usage({ + description: "Show status information about a table's partitioning state", + details: ` + This command displays the current state of a table's partitioning workflow, + including: + - Whether intermediate table exists + - Number of partitions + - Whether mirroring triggers are enabled + - Whether the original table is partitioned + `, + examples: [ + ["Show status for a table", "$0 status posts"], + ["Show status with explicit schema", "$0 status myschema.posts"], + ["Output as JSON", "$0 status posts --json"], + ], + }); + + table = Option.String({ required: true, name: "table" }); + json = Option.Boolean("--json", false, { + description: "Output status as JSON", + }); + + override async perform(pgslice: Pgslice): Promise { + const status = await pgslice.status({ table: this.table }); + + if (this.json) { + this.context.stdout.write(JSON.stringify(status, null, 2) + "\n"); + } else { + this.context.stdout.write(`Table: ${this.table}\n`); + this.context.stdout.write( + `Intermediate exists: ${status.intermediateExists}\n`, + ); + this.context.stdout.write( + `Original is partitioned: ${status.originalIsPartitioned}\n`, + ); + this.context.stdout.write(`Partition count: ${status.partitionCount}\n`); + this.context.stdout.write( + `Mirror trigger exists: ${status.mirrorTriggerExists}\n`, + ); + this.context.stdout.write( + `Retired mirror trigger exists: ${status.retiredMirrorTriggerExists}\n`, + ); + } + } +} diff --git a/src/mirroring.ts b/src/mirroring.ts index 5695e65..1f470bc 100644 --- a/src/mirroring.ts +++ b/src/mirroring.ts @@ -61,12 +61,18 @@ export class Mirroring { return sql.identifier([`${this.#source.name}_${suffix}`]); } - get #triggerName() { + static triggerNameFor(table: Table, targetType: MirroringTargetType): string { const suffix = - this.#targetType === "intermediate" + targetType === "intermediate" ? "mirror_trigger" : "retired_mirror_trigger"; - return sql.identifier([`${this.#source.name}_${suffix}`]); + return `${table.name}_${suffix}`; + } + + get #triggerName() { + return sql.identifier([ + Mirroring.triggerNameFor(this.#source, this.#targetType), + ]); } #buildWhereClause(columns: string[]) { diff --git a/src/pgslice.test.ts b/src/pgslice.test.ts index b661fcd..9002755 100644 --- a/src/pgslice.test.ts +++ b/src/pgslice.test.ts @@ -666,6 +666,51 @@ describe("Pgslice.addPartitions", () => { }); }); +describe("Pgslice.status", () => { + test("returns status for a plain table", async ({ pgslice, transaction }) => { + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id SERIAL PRIMARY KEY, + created_at DATE NOT NULL + ) + `); + + const status = await pgslice.status({ table: "posts" }); + expect(status).toEqual({ + intermediateExists: false, + partitionCount: 0, + mirrorTriggerExists: false, + retiredMirrorTriggerExists: false, + originalIsPartitioned: false, + }); + }); + + test("returns status after prep", async ({ pgslice, transaction }) => { + await transaction.query(sql.unsafe` + CREATE TABLE events ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL + ) + `); + + await pgslice.prep(transaction, { + table: "events", + column: "created_at", + period: "month", + partition: true, + }); + + const status = await pgslice.status({ table: "events" }); + expect(status).toEqual({ + intermediateExists: true, + partitionCount: 0, + originalIsPartitioned: false, + mirrorTriggerExists: false, + retiredMirrorTriggerExists: false, + }); + }); +}); + describe("Pgslice.synchronize", () => { test("synchronizes data", async ({ pgslice, transaction }) => { // Create source table with data diff --git a/src/pgslice.ts b/src/pgslice.ts index 2482e41..d019c5b 100644 --- a/src/pgslice.ts +++ b/src/pgslice.ts @@ -16,9 +16,11 @@ import type { FillOptions, Period, PrepOptions, + StatusOptions, SwapOptions, SynchronizeBatchResult, SynchronizeOptions, + TableStatus, UnprepOptions, UnswapOptions, } from "./types.js"; @@ -565,4 +567,43 @@ export class Pgslice { }), ); } + + async status(options: StatusOptions): Promise { + const table = Table.parse(options.table); + const intermediate = table.intermediate; + + const intermediateExists = await intermediate.exists(this.pool); + + // Check intermediate for partitions pre-swap, original post-swap + let partitionCount = 0; + if (intermediateExists) { + const partitions = await this.start((conn) => + intermediate.partitions(conn), + ); + partitionCount = partitions.length; + } else { + const partitions = await this.start((conn) => table.partitions(conn)); + partitionCount = partitions.length; + } + + const mirrorTriggerExists = await table.triggerExists( + this.pool, + Mirroring.triggerNameFor(table, "intermediate"), + ); + + const retiredMirrorTriggerExists = await table.triggerExists( + this.pool, + Mirroring.triggerNameFor(table, "retired"), + ); + + const originalIsPartitioned = await table.isPartitioned(this.pool); + + return { + intermediateExists, + partitionCount, + mirrorTriggerExists, + retiredMirrorTriggerExists, + originalIsPartitioned, + }; + } } diff --git a/src/table.test.ts b/src/table.test.ts index 4746d53..4169380 100644 --- a/src/table.test.ts +++ b/src/table.test.ts @@ -4,6 +4,104 @@ import { sql } from "slonik"; import { pgsliceTest as test } from "./testing/index.js"; import { Table } from "./table.js"; +describe("Table.isPartitioned", () => { + test("returns true for partitioned table", async ({ transaction }) => { + await transaction.query(sql.unsafe` + CREATE TABLE test_table ( + id BIGSERIAL, + created_at DATE NOT NULL, + PRIMARY KEY (id, created_at) + ) PARTITION BY RANGE (created_at) + `); + + const table = Table.parse("test_table"); + const result = await table.isPartitioned(transaction); + + expect(result).toBe(true); + }); + + test("returns false for regular table", async ({ transaction }) => { + await transaction.query(sql.unsafe` + CREATE TABLE test_table (id BIGSERIAL PRIMARY KEY, name TEXT) + `); + + const table = Table.parse("test_table"); + const result = await table.isPartitioned(transaction); + + expect(result).toBe(false); + }); + + test("returns false for non-existent table", async ({ transaction }) => { + const table = Table.parse("nonexistent_table"); + const result = await table.isPartitioned(transaction); + + expect(result).toBe(false); + }); +}); + +describe("Table.triggerExists", () => { + test("returns true when trigger exists", async ({ transaction }) => { + await transaction.query(sql.unsafe` + CREATE TABLE test_table (id BIGSERIAL PRIMARY KEY, name TEXT) + `); + await transaction.query(sql.unsafe` + CREATE FUNCTION test_trigger_fn() RETURNS TRIGGER AS $$ + BEGIN RETURN NEW; END; + $$ LANGUAGE plpgsql + `); + await transaction.query(sql.unsafe` + CREATE TRIGGER test_trigger + AFTER INSERT ON test_table + FOR EACH ROW EXECUTE FUNCTION test_trigger_fn() + `); + + const table = Table.parse("test_table"); + const result = await table.triggerExists(transaction, "test_trigger"); + + expect(result).toBe(true); + }); + + test("returns false when trigger does not exist", async ({ transaction }) => { + await transaction.query(sql.unsafe` + CREATE TABLE test_table (id BIGSERIAL PRIMARY KEY, name TEXT) + `); + + const table = Table.parse("test_table"); + const result = await table.triggerExists( + transaction, + "nonexistent_trigger", + ); + + expect(result).toBe(false); + }); + + test("returns false for trigger on different table", async ({ + transaction, + }) => { + await transaction.query(sql.unsafe` + CREATE TABLE test_table (id BIGSERIAL PRIMARY KEY, name TEXT) + `); + await transaction.query(sql.unsafe` + CREATE TABLE other_table (id BIGSERIAL PRIMARY KEY, name TEXT) + `); + await transaction.query(sql.unsafe` + CREATE FUNCTION test_trigger_fn() RETURNS TRIGGER AS $$ + BEGIN RETURN NEW; END; + $$ LANGUAGE plpgsql + `); + await transaction.query(sql.unsafe` + CREATE TRIGGER test_trigger + AFTER INSERT ON other_table + FOR EACH ROW EXECUTE FUNCTION test_trigger_fn() + `); + + const table = Table.parse("test_table"); + const result = await table.triggerExists(transaction, "test_trigger"); + + expect(result).toBe(false); + }); +}); + describe("Table.maxId", () => { test("returns max bigint ID from table", async ({ transaction }) => { await transaction.query(sql.unsafe` diff --git a/src/table.ts b/src/table.ts index f1c71c5..9bacd45 100644 --- a/src/table.ts +++ b/src/table.ts @@ -155,6 +155,42 @@ export class Table { return result.count > 0; } + /** + * Checks if this table is a partitioned table (relkind = 'p'). + */ + async isPartitioned(connection: CommonQueryMethods): Promise { + const result = await connection.one( + sql.type(z.object({ count: z.coerce.number() }))` + SELECT COUNT(*) FROM pg_class c + JOIN pg_namespace n ON c.relnamespace = n.oid + WHERE n.nspname = ${this.schema} + AND c.relname = ${this.name} + AND c.relkind = 'p' + `, + ); + return result.count > 0; + } + + /** + * Checks if a trigger with the given name exists on this table. + */ + async triggerExists( + connection: CommonQueryMethods, + triggerName: string, + ): Promise { + const result = await connection.one( + sql.type(z.object({ count: z.coerce.number() }))` + SELECT COUNT(*) FROM pg_trigger t + JOIN pg_class c ON t.tgrelid = c.oid + JOIN pg_namespace n ON c.relnamespace = n.oid + WHERE n.nspname = ${this.schema} + AND c.relname = ${this.name} + AND t.tgname = ${triggerName} + `, + ); + return result.count > 0; + } + /** * Gets column metadata for this table (excluding generated columns). */ diff --git a/src/types.ts b/src/types.ts index f407f9c..8a0826e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -191,3 +191,21 @@ export interface AnalyzeOptions { export interface UnprepOptions { table: string; } + +/** + * Options for the `status` command. + */ +export interface StatusOptions { + table: string; +} + +/** + * Status information about a table's partitioning state. + */ +export interface TableStatus { + intermediateExists: boolean; + partitionCount: number; + mirrorTriggerExists: boolean; + retiredMirrorTriggerExists: boolean; + originalIsPartitioned: boolean; +}