Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { EnableMirroringCommand } from "./commands/enable-mirroring.js";
import { EnableRetiredMirroringCommand } from "./commands/enable-retired-mirroring.js";
import { FillCommand } from "./commands/fill.js";
import { PrepCommand } from "./commands/prep.js";
import { StatusCommand } from "./commands/status.js";
import { SwapCommand } from "./commands/swap.js";
import { SynchronizeCommand } from "./commands/synchronize.js";
import { UnprepCommand } from "./commands/unprep.js";
Expand All @@ -30,6 +31,7 @@ export function createCli(): Cli {
cli.register(EnableRetiredMirroringCommand);
cli.register(FillCommand);
cli.register(PrepCommand);
cli.register(StatusCommand);
cli.register(SwapCommand);
cli.register(SynchronizeCommand);
cli.register(UnprepCommand);
Expand Down
45 changes: 45 additions & 0 deletions src/commands/status.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { describe, expect } from "vitest";
import { sql } from "slonik";

import { commandTest as test } from "../testing/index.js";
import { StatusCommand } from "./status.js";

describe("StatusCommand", () => {
test.scoped({ commandClass: ({}, use) => use(StatusCommand) });

test.beforeEach(async ({ transaction }) => {
await transaction.query(sql.unsafe`
CREATE TABLE posts (
id SERIAL PRIMARY KEY,
created_at DATE NOT NULL
)
`);
});

test("outputs status as JSON", async ({ cli, commandContext }) => {
const exitCode = await cli.run(
["status", "posts", "--json"],
commandContext,
);

expect(exitCode).toBe(0);
const status = JSON.parse(commandContext.stdout.read()?.toString() ?? "");
expect(status).toEqual({
intermediateExists: false,
partitionCount: 0,
mirrorTriggerExists: false,
retiredMirrorTriggerExists: false,
originalIsPartitioned: false,
});
});

test("outputs human-readable status", async ({ cli, commandContext }) => {
const exitCode = await cli.run(["status", "posts"], commandContext);

expect(exitCode).toBe(0);
const output = commandContext.stdout.read()?.toString();
expect(output).toContain("Table: posts");
expect(output).toContain("Intermediate exists:");
expect(output).toContain("Original is partitioned:");
});
});
53 changes: 53 additions & 0 deletions src/commands/status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { Command, Option } from "clipanion";

import { BaseCommand } from "./base.js";
import { type Pgslice } from "../pgslice.js";

export class StatusCommand extends BaseCommand {
static override paths = [["status"]];

static override usage = Command.Usage({
description: "Show status information about a table's partitioning state",
details: `
This command displays the current state of a table's partitioning workflow,
including:
- Whether intermediate table exists
- Number of partitions
- Whether mirroring triggers are enabled
- Whether the original table is partitioned
`,
examples: [
["Show status for a table", "$0 status posts"],
["Show status with explicit schema", "$0 status myschema.posts"],
["Output as JSON", "$0 status posts --json"],
],
});

table = Option.String({ required: true, name: "table" });
json = Option.Boolean("--json", false, {
description: "Output status as JSON",
});

override async perform(pgslice: Pgslice): Promise<void> {
const status = await pgslice.status({ table: this.table });

if (this.json) {
this.context.stdout.write(JSON.stringify(status, null, 2) + "\n");
} else {
this.context.stdout.write(`Table: ${this.table}\n`);
this.context.stdout.write(
`Intermediate exists: ${status.intermediateExists}\n`,
);
this.context.stdout.write(
`Original is partitioned: ${status.originalIsPartitioned}\n`,
);
this.context.stdout.write(`Partition count: ${status.partitionCount}\n`);
this.context.stdout.write(
`Mirror trigger exists: ${status.mirrorTriggerExists}\n`,
);
this.context.stdout.write(
`Retired mirror trigger exists: ${status.retiredMirrorTriggerExists}\n`,
);
}
}
}
12 changes: 9 additions & 3 deletions src/mirroring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,18 @@ export class Mirroring {
return sql.identifier([`${this.#source.name}_${suffix}`]);
}

get #triggerName() {
static triggerNameFor(table: Table, targetType: MirroringTargetType): string {
const suffix =
this.#targetType === "intermediate"
targetType === "intermediate"
? "mirror_trigger"
: "retired_mirror_trigger";
return sql.identifier([`${this.#source.name}_${suffix}`]);
return `${table.name}_${suffix}`;
}

get #triggerName() {
return sql.identifier([
Mirroring.triggerNameFor(this.#source, this.#targetType),
]);
}

#buildWhereClause(columns: string[]) {
Expand Down
45 changes: 45 additions & 0 deletions src/pgslice.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,51 @@ describe("Pgslice.addPartitions", () => {
});
});

describe("Pgslice.status", () => {
test("returns status for a plain table", async ({ pgslice, transaction }) => {
await transaction.query(sql.unsafe`
CREATE TABLE posts (
id SERIAL PRIMARY KEY,
created_at DATE NOT NULL
)
`);

const status = await pgslice.status({ table: "posts" });
expect(status).toEqual({
intermediateExists: false,
partitionCount: 0,
mirrorTriggerExists: false,
retiredMirrorTriggerExists: false,
originalIsPartitioned: false,
});
});

test("returns status after prep", async ({ pgslice, transaction }) => {
await transaction.query(sql.unsafe`
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
created_at DATE NOT NULL
)
`);

await pgslice.prep(transaction, {
table: "events",
column: "created_at",
period: "month",
partition: true,
});

const status = await pgslice.status({ table: "events" });
expect(status).toEqual({
intermediateExists: true,
partitionCount: 0,
originalIsPartitioned: false,
mirrorTriggerExists: false,
retiredMirrorTriggerExists: false,
});
});
});

describe("Pgslice.synchronize", () => {
test("synchronizes data", async ({ pgslice, transaction }) => {
// Create source table with data
Expand Down
41 changes: 41 additions & 0 deletions src/pgslice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import type {
FillOptions,
Period,
PrepOptions,
StatusOptions,
SwapOptions,
SynchronizeBatchResult,
SynchronizeOptions,
TableStatus,
UnprepOptions,
UnswapOptions,
} from "./types.js";
Expand Down Expand Up @@ -565,4 +567,43 @@ export class Pgslice {
}),
);
}

async status(options: StatusOptions): Promise<TableStatus> {
const table = Table.parse(options.table);
const intermediate = table.intermediate;

const intermediateExists = await intermediate.exists(this.pool);

// Check intermediate for partitions pre-swap, original post-swap
let partitionCount = 0;
if (intermediateExists) {
const partitions = await this.start((conn) =>
intermediate.partitions(conn),
);
partitionCount = partitions.length;
} else {
const partitions = await this.start((conn) => table.partitions(conn));
partitionCount = partitions.length;
}

const mirrorTriggerExists = await table.triggerExists(
this.pool,
Mirroring.triggerNameFor(table, "intermediate"),
);

const retiredMirrorTriggerExists = await table.triggerExists(
this.pool,
Mirroring.triggerNameFor(table, "retired"),
);

const originalIsPartitioned = await table.isPartitioned(this.pool);

return {
intermediateExists,
partitionCount,
mirrorTriggerExists,
retiredMirrorTriggerExists,
originalIsPartitioned,
};
}
}
98 changes: 98 additions & 0 deletions src/table.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,104 @@ import { sql } from "slonik";
import { pgsliceTest as test } from "./testing/index.js";
import { Table } from "./table.js";

describe("Table.isPartitioned", () => {
test("returns true for partitioned table", async ({ transaction }) => {
await transaction.query(sql.unsafe`
CREATE TABLE test_table (
id BIGSERIAL,
created_at DATE NOT NULL,
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at)
`);

const table = Table.parse("test_table");
const result = await table.isPartitioned(transaction);

expect(result).toBe(true);
});

test("returns false for regular table", async ({ transaction }) => {
await transaction.query(sql.unsafe`
CREATE TABLE test_table (id BIGSERIAL PRIMARY KEY, name TEXT)
`);

const table = Table.parse("test_table");
const result = await table.isPartitioned(transaction);

expect(result).toBe(false);
});

test("returns false for non-existent table", async ({ transaction }) => {
const table = Table.parse("nonexistent_table");
const result = await table.isPartitioned(transaction);

expect(result).toBe(false);
});
});

describe("Table.triggerExists", () => {
test("returns true when trigger exists", async ({ transaction }) => {
await transaction.query(sql.unsafe`
CREATE TABLE test_table (id BIGSERIAL PRIMARY KEY, name TEXT)
`);
await transaction.query(sql.unsafe`
CREATE FUNCTION test_trigger_fn() RETURNS TRIGGER AS $$
BEGIN RETURN NEW; END;
$$ LANGUAGE plpgsql
`);
await transaction.query(sql.unsafe`
CREATE TRIGGER test_trigger
AFTER INSERT ON test_table
FOR EACH ROW EXECUTE FUNCTION test_trigger_fn()
`);

const table = Table.parse("test_table");
const result = await table.triggerExists(transaction, "test_trigger");

expect(result).toBe(true);
});

test("returns false when trigger does not exist", async ({ transaction }) => {
await transaction.query(sql.unsafe`
CREATE TABLE test_table (id BIGSERIAL PRIMARY KEY, name TEXT)
`);

const table = Table.parse("test_table");
const result = await table.triggerExists(
transaction,
"nonexistent_trigger",
);

expect(result).toBe(false);
});

test("returns false for trigger on different table", async ({
transaction,
}) => {
await transaction.query(sql.unsafe`
CREATE TABLE test_table (id BIGSERIAL PRIMARY KEY, name TEXT)
`);
await transaction.query(sql.unsafe`
CREATE TABLE other_table (id BIGSERIAL PRIMARY KEY, name TEXT)
`);
await transaction.query(sql.unsafe`
CREATE FUNCTION test_trigger_fn() RETURNS TRIGGER AS $$
BEGIN RETURN NEW; END;
$$ LANGUAGE plpgsql
`);
await transaction.query(sql.unsafe`
CREATE TRIGGER test_trigger
AFTER INSERT ON other_table
FOR EACH ROW EXECUTE FUNCTION test_trigger_fn()
`);

const table = Table.parse("test_table");
const result = await table.triggerExists(transaction, "test_trigger");

expect(result).toBe(false);
});
});

describe("Table.maxId", () => {
test("returns max bigint ID from table", async ({ transaction }) => {
await transaction.query(sql.unsafe`
Expand Down
36 changes: 36 additions & 0 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,42 @@ export class Table {
return result.count > 0;
}

/**
* Checks if this table is a partitioned table (relkind = 'p').
*/
async isPartitioned(connection: CommonQueryMethods): Promise<boolean> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it adds more code my preference was to delegate the actual logic to Table in order to maintain separation of concerns.

const result = await connection.one(
sql.type(z.object({ count: z.coerce.number() }))`
SELECT COUNT(*) FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = ${this.schema}
AND c.relname = ${this.name}
AND c.relkind = 'p'
`,
);
return result.count > 0;
}

/**
* Checks if a trigger with the given name exists on this table.
*/
async triggerExists(
connection: CommonQueryMethods,
triggerName: string,
): Promise<boolean> {
const result = await connection.one(
sql.type(z.object({ count: z.coerce.number() }))`
SELECT COUNT(*) FROM pg_trigger t
JOIN pg_class c ON t.tgrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = ${this.schema}
AND c.relname = ${this.name}
AND t.tgname = ${triggerName}
`,
);
return result.count > 0;
}

/**
* Gets column metadata for this table (excluding generated columns).
*/
Expand Down
Loading