From c2df119d9979ff818f953b9eb306e30fe9e8ce1c Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 1 Jul 2026 16:29:07 -0400 Subject: [PATCH 1/4] Deferred migrations --- quickwit/Makefile | 2 + quickwit/quickwit-metastore/README.md | 10 + quickwit/quickwit-metastore/build.rs | 1 + .../migrations/postgresql_deferred/README.md | 7 + .../src/metastore/postgres/metastore.rs | 13 +- .../src/metastore/postgres/migrator.rs | 246 ++++++++++++++---- .../src/metastore/postgres/mod.rs | 1 + 7 files changed, 222 insertions(+), 58 deletions(-) create mode 100644 quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md diff --git a/quickwit/Makefile b/quickwit/Makefile index e09001ffe0b..c4f3b4a6531 100644 --- a/quickwit/Makefile +++ b/quickwit/Makefile @@ -28,6 +28,8 @@ migrations-lint: @echo "Linting Postgres migrations with diesel-guard" @(command -v diesel-guard >/dev/null || (echo "diesel-guard is not installed. Please install using 'cargo install diesel-guard'." && exit 1)) @diesel-guard check quickwit-metastore/migrations/postgresql/ + @if ls quickwit-metastore/migrations/postgresql_deferred/*.sql >/dev/null 2>&1; then \ + diesel-guard check quickwit-metastore/migrations/postgresql_deferred/; fi unused-deps: @echo "Checking for unused dependencies" diff --git a/quickwit/quickwit-metastore/README.md b/quickwit/quickwit-metastore/README.md index 0743ec9d66c..2af73ddeda2 100644 --- a/quickwit/quickwit-metastore/README.md +++ b/quickwit/quickwit-metastore/README.md @@ -53,3 +53,13 @@ You can then use the following commands to apply/revert your postgresql migratio sqlx migrate run --database-url postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev --source migrations/postgresql sqlx migrate revert --database-url postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev --source migrations/postgresql ``` + +## Deferred migrations + +`migrations/postgresql_deferred` holds long-running, degrade-gracefully migrations (e.g. `CREATE INDEX CONCURRENTLY`). +They run in a background task after readiness, elected across pods by a Postgres advisory lock, and share the +`_sqlx_migrations` table with the required track, so version numbers must be globally unique across both dirs and each +migration must be idempotent. See `migrations/postgresql_deferred/README.md` for authoring rules. +``` +sqlx migrate run --database-url postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev --source migrations/postgresql_deferred +``` diff --git a/quickwit/quickwit-metastore/build.rs b/quickwit/quickwit-metastore/build.rs index c1c9cc14375..889a77a6f99 100644 --- a/quickwit/quickwit-metastore/build.rs +++ b/quickwit/quickwit-metastore/build.rs @@ -14,4 +14,5 @@ fn main() { println!("cargo:rerun-if-changed=migrations/postgresql"); + println!("cargo:rerun-if-changed=migrations/postgresql_deferred"); } diff --git a/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md b/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md new file mode 100644 index 00000000000..dd72e838905 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md @@ -0,0 +1,7 @@ +# Deferred PostgreSQL migrations + +Long, degrade-gracefully-only migrations (e.g. `CREATE INDEX CONCURRENTLY`) run in a background task after readiness. Both tracks share the `_sqlx_migrations` table. + +- Version must be globally unique across both dirs (continue the single sequence). +- Must be idempotent: `-- no-transaction` first line, then `DROP INDEX CONCURRENTLY IF EXISTS ...; CREATE INDEX CONCURRENTLY IF NOT EXISTS ...`. +- Never depend on an unshipped required migration; required migrations must never depend on a deferred one. \ No newline at end of file diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index de080bfcc4e..e6229bfc7b0 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -62,15 +62,15 @@ use tracing::{debug, info, instrument, warn}; use uuid::Uuid; use super::error::convert_sqlx_err; -use super::migrator::run_migrations; +use super::migrator::{run_migrations, spawn_deferred_migrations}; use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits}; use super::parquet_model::{InsertableParquetSplit, ParquetSplitRecord, PgParquetSplit}; use super::pool::TrackedPool; use super::split_stream::SplitStream; use super::utils::{append_query_filters_and_order_by, establish_connection}; use super::{ - QW_POSTGRES_READ_ONLY_ENV_KEY, QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, - QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, + QW_POSTGRES_READ_ONLY_ENV_KEY, QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY, + QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, }; use crate::checkpoint::{ IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta, @@ -125,6 +125,7 @@ impl PostgresqlMetastore { let read_only = get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false); let skip_migrations = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false); let skip_locking = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false); + let skip_deferred = get_bool_from_env(QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY, false); let connection_pool = establish_connection( connection_uri, @@ -139,6 +140,12 @@ impl PostgresqlMetastore { run_migrations(&connection_pool, skip_migrations, skip_locking).await?; + // Deferred migrations elect a leader via advisory locks, so we can't run them when locking + // is skipped. read_only and skip_migrations likewise opt out of any migration work. + if !skip_migrations && !read_only && !skip_locking && !skip_deferred { + spawn_deferred_migrations(connection_pool.clone()); + } + let metastore = PostgresqlMetastore { uri: connection_uri.clone(), connection_pool, diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs index 03c40eec666..d650c37b381 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs @@ -17,50 +17,74 @@ use std::collections::BTreeMap; use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; use sqlx::migrate::{Migrate, Migrator}; use sqlx::{Acquire, PgConnection, Postgres}; -use tracing::{error, instrument}; +use tracing::{error, info}; use super::pool::TrackedPool; +// The deferred migrations should be attempted by only one metastore pod. We do that by having +// metastore pods try to acquire a lock to run the deferred migrations. Pods that don't get the lock +// ignore deferred migrations. The locks are implemented using Postgres advisory locks. +// +// Advisory locks are Postgres locks on custom defined resources, represented by a key of two ints. +// These two ints are arbitrary magic (but stable) numbers that make up the lock for running deferred +// migrations. +const DEFERRED_MIGRATIONS_LOCK_KEY_1: i32 = 424242; +const DEFERRED_MIGRATIONS_LOCK_KEY_2: i32 = 1; + fn get_migrations() -> Migrator { sqlx::migrate!("migrations/postgresql") } +fn get_deferred_migrations() -> Migrator { + sqlx::migrate!("migrations/postgresql_deferred") +} + /// Initializes the database and runs the SQL migrations stored in the /// `quickwit-metastore/migrations` directory. -#[instrument(skip_all)] +/// +/// Runs on a raw pooled connection -- not wrapped in an outer transaction. +/// sqlx's `Migrator::run_direct` handles per-migration transactionality +/// itself, honoring each migration's `no_tx` flag (set by a +/// `-- no-transaction` directive as the first line of the migration file). +/// Wrapping the run in our own transaction would defeat that for migrations +/// that must execute outside a transaction block (e.g. `CREATE INDEX +/// CONCURRENTLY`). +/// +/// Atomicity is per-migration, not per-run: a failure on migration N leaves +/// migrations 1..N-1 applied and committed in `_sqlx_migrations`. The +/// operator fixes the failing migration and re-runs. +// #[instrument(skip_all)] pub(super) async fn run_migrations( pool: &TrackedPool, skip_migrations: bool, skip_locking: bool, ) -> MetastoreResult<()> { - let mut tx = pool.begin().await?; - let conn = tx.acquire().await?; - + let mut conn = pool.acquire().await?; let mut migrator = get_migrations(); + // ignore_missing will throw if any migrations applied to the DB are not present in the set of + // migrations it's given to run. Given that we also have deferred migrations, that will never be + // true. + migrator.set_ignore_missing(true); + if skip_locking { migrator.set_locking(false); } - if !skip_migrations { - // this is an hidden function, made to get "around the annoying "implementation of `Acquire` - // is not general enough" error", which is the error we get otherwise. - let migrate_result = migrator.run_direct(conn).await; + if skip_migrations { + return check_migrations(migrator, &mut conn).await; + } - let Err(migrate_error) = migrate_result else { - tx.commit().await?; - return Ok(()); - }; - tx.rollback().await?; + // this is a hidden function, made to get "around the annoying "implementation of `Acquire` + // is not general enough" error", which is the error we get otherwise. + if let Err(migrate_error) = migrator.run_direct(&mut *conn).await { error!(error=%migrate_error, "failed to run PostgreSQL migrations"); - - Err(MetastoreError::Internal { + return Err(MetastoreError::Internal { message: "failed to run PostgreSQL migrations".to_string(), cause: migrate_error.to_string(), - }) - } else { - check_migrations(migrator, conn).await + }); } + Ok(()) } async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { @@ -94,53 +118,33 @@ async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> Metast }); } }; - let expected_migrations: BTreeMap<_, _> = migrator + let applied_by_version: BTreeMap<_, _> = applied_migrations .iter() - .filter(|migration| migration.migration_type.is_up_migration()) - .map(|migration| (migration.version, migration)) + .map(|applied_migration| (applied_migration.version, applied_migration)) .collect(); - if applied_migrations.len() < expected_migrations.len() { - error!( - "missing migrations, expected {} migrations, only {} present in database", - expected_migrations.len(), - applied_migrations.len() - ); - - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: format!( - "missing migrations, expected {} migrations, only {} present in database", - expected_migrations.len(), - applied_migrations.len() - ), - }); - } - for applied_migration in applied_migrations { - let Some(migration) = expected_migrations.get(&applied_migration.version) else { - error!( - "found unknown migration {} in database", - applied_migration.version - ); + for expected_migration in migrator + .iter() + .filter(|migration| migration.migration_type.is_up_migration()) + { + let Some(applied_migration) = applied_by_version.get(&expected_migration.version) else { + error!("missing required migration {}", expected_migration.version); return Err(MetastoreError::Internal { message: "failed to validate PostgreSQL migrations".to_string(), - cause: format!( - "found unknown migration {} in database", - applied_migration.version - ), + cause: format!("missing required migration {}", expected_migration.version), }); }; - if migration.checksum != applied_migration.checksum { + if expected_migration.checksum != applied_migration.checksum { error!( - "migration {} differ between database and expected value", - applied_migration.version + "migration {} differs between database and expected value", + expected_migration.version ); return Err(MetastoreError::Internal { message: "failed to validate PostgreSQL migrations".to_string(), cause: format!( - "migration {} differ between database and expected value", - applied_migration.version + "migration {} differs between database and expected value", + expected_migration.version ), }); } @@ -148,15 +152,76 @@ async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> Metast Ok(()) } +enum DeferredOutcome { + LockNotAcquired, + Applied, +} + +pub(super) async fn run_deferred_migrations( + pool: &TrackedPool, + mut migrator: Migrator, +) -> MetastoreResult { + let mut conn: PgConnection = pool.acquire().await?.detach(); + + // this query returns either true or false: we got the lock, or we didn't. + let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_lock($1, $2)") + .bind(DEFERRED_MIGRATIONS_LOCK_KEY_1) + .bind(DEFERRED_MIGRATIONS_LOCK_KEY_2) + .fetch_one(&mut conn) + .await + .map_err(|error| MetastoreError::Internal { + message: "failed to query deferred-migration advisory lock".to_string(), + cause: error.to_string(), + })?; + if !acquired { + return Ok(DeferredOutcome::LockNotAcquired); + } + + // we've taken out our own lock; we don't need sqlx to also try to do so. + migrator.set_locking(false); + migrator.set_ignore_missing(true); + let run_result = migrator.run_direct(&mut conn).await; + + // dropping the detached connection closes the session, which also releases the lock + drop(conn); + + if let Err(migrate_error) = run_result { + error!(error=%migrate_error, "failed to run deferred PostgreSQL migrations"); + return Err(MetastoreError::Internal { + message: "failed to run deferred PostgreSQL migrations".to_string(), + cause: migrate_error.to_string(), + }); + } + Ok(DeferredOutcome::Applied) +} + +/// Spawns the one-shot, detached deferred-migration attempt. Fire-and-forget: +/// self-terminates after one attempt or when the process exits. +/// TODO: Figure out what to do here on error etc. +pub(super) fn spawn_deferred_migrations(pool: TrackedPool) { + quickwit_common::spawn_named_task( + async move { + match run_deferred_migrations(&pool, get_deferred_migrations()).await { + Ok(DeferredOutcome::Applied) => info!("deferred PostgreSQL migrations applied"), + Ok(DeferredOutcome::LockNotAcquired) => { + info!("deferred PostgreSQL migrations handled by another node") + } + Err(error) => error!(%error, "deferred PostgreSQL migrations failed"), + } + }, + "postgres_deferred_migrations", + ); +} + #[cfg(test)] mod tests { use std::time::Duration; use quickwit_common::uri::Uri; use sqlx::Acquire; - use sqlx::migrate::Migrate; + use sqlx::migrate::{Migrate, Migrator}; - use super::{get_migrations, run_migrations}; + use super::{DeferredOutcome, get_migrations, run_deferred_migrations, run_migrations}; use crate::metastore::postgres::utils::establish_connection; #[tokio::test] @@ -183,6 +248,21 @@ mod tests { // we just ran migration, nothing else to run run_migrations(&connection_pool, true, false).await.unwrap(); + // an unknown high-version row (a deferred migration, or a newer rolled-back image) is + // tolerated: the required-track check only asserts required migrations are present + sqlx::query( + "INSERT INTO _sqlx_migrations (version, description, success, checksum, \ + execution_time) VALUES (999999, 'tolerance test row', true, '\\x00'::bytea, 0)", + ) + .execute(&connection_pool) + .await + .unwrap(); + run_migrations(&connection_pool, true, false).await.unwrap(); + sqlx::query("DELETE FROM _sqlx_migrations WHERE version = 999999") + .execute(&connection_pool) + .await + .unwrap(); + let migrations = get_migrations(); let last_migration = migrations .iter() @@ -227,4 +307,60 @@ mod tests { run_migrations(&connection_pool, true, false).await.unwrap(); } } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_deferred_migrations_elect_single_runner() { + let _ = tracing_subscriber::fmt::try_init(); + + dotenvy::dotenv().ok(); + let uri: Uri = std::env::var("QW_TEST_DATABASE_URL") + .expect("environment variable `QW_TEST_DATABASE_URL` should be set") + .parse() + .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI"); + let connection_pool = + establish_connection(&uri, 2, 5, Duration::from_secs(5), None, None, false) + .await + .unwrap(); + + // in case a previous attempt ran before us + sqlx::query("DELETE FROM _sqlx_migrations WHERE version = 90001") + .execute(&connection_pool) + .await + .unwrap(); + + // keeps the lock winner busy while the other node tries to acquire it + let migrations_dir = tempfile::tempdir().unwrap(); + std::fs::write( + migrations_dir.path().join("90001_slow_marker.up.sql"), + "SELECT pg_sleep(1)", + ) + .unwrap(); + + let (result_1, result_2) = tokio::join!( + run_deferred_migrations( + &connection_pool, + Migrator::new(migrations_dir.path()).await.unwrap() + ), + run_deferred_migrations( + &connection_pool, + Migrator::new(migrations_dir.path()).await.unwrap() + ), + ); + + let outcomes = [ + result_1.expect("first deferred run should not error"), + result_2.expect("second deferred run should not error"), + ]; + let applied = outcomes + .iter() + .filter(|outcome| matches!(outcome, DeferredOutcome::Applied)) + .count(); + let not_acquired = outcomes + .iter() + .filter(|outcome| matches!(outcome, DeferredOutcome::LockNotAcquired)) + .count(); + assert_eq!(applied, 1, "exactly one runner should win the lock and apply"); + assert_eq!(not_acquired, 1, "the other runner should not acquire the lock"); + } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs index a1c99997f3a..d05080174c5 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs @@ -29,4 +29,5 @@ pub use metastore::PostgresqlMetastore; const QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY: &str = "QW_POSTGRES_SKIP_MIGRATIONS"; const QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY: &str = "QW_POSTGRES_SKIP_MIGRATION_LOCKING"; +const QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY: &str = "QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS"; const QW_POSTGRES_READ_ONLY_ENV_KEY: &str = "QW_POSTGRES_READ_ONLY"; From a1b638bbbd29a4384d566ede8b086d228e49f380 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 1 Jul 2026 18:41:12 -0400 Subject: [PATCH 2/4] Refactored completely --- .../src/metastore/postgres/metastore.rs | 18 +- .../src/metastore/postgres/metrics.rs | 8 +- .../src/metastore/postgres/migrator.rs | 556 +++++++++++------- 3 files changed, 344 insertions(+), 238 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index e6229bfc7b0..ef4f6bd5230 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -62,16 +62,13 @@ use tracing::{debug, info, instrument, warn}; use uuid::Uuid; use super::error::convert_sqlx_err; -use super::migrator::{run_migrations, spawn_deferred_migrations}; +use super::migrator::Migrations; use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits}; use super::parquet_model::{InsertableParquetSplit, ParquetSplitRecord, PgParquetSplit}; use super::pool::TrackedPool; use super::split_stream::SplitStream; use super::utils::{append_query_filters_and_order_by, establish_connection}; -use super::{ - QW_POSTGRES_READ_ONLY_ENV_KEY, QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY, - QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, -}; +use super::QW_POSTGRES_READ_ONLY_ENV_KEY; use crate::checkpoint::{ IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta, }; @@ -123,9 +120,6 @@ impl PostgresqlMetastore { .expect("PostgreSQL metastore config should have been validated"); let read_only = get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false); - let skip_migrations = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false); - let skip_locking = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false); - let skip_deferred = get_bool_from_env(QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY, false); let connection_pool = establish_connection( connection_uri, @@ -138,13 +132,7 @@ impl PostgresqlMetastore { ) .await?; - run_migrations(&connection_pool, skip_migrations, skip_locking).await?; - - // Deferred migrations elect a leader via advisory locks, so we can't run them when locking - // is skipped. read_only and skip_migrations likewise opt out of any migration work. - if !skip_migrations && !read_only && !skip_locking && !skip_deferred { - spawn_deferred_migrations(connection_pool.clone()); - } + Migrations::new(connection_pool.clone()).run().await?; let metastore = PostgresqlMetastore { uri: connection_uri.clone(), diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs index b7647d4320c..c7bbebeba86 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs @@ -12,7 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use quickwit_metrics::{LazyGauge, lazy_gauge}; +use quickwit_metrics::{LazyCounter, LazyGauge, lazy_counter, lazy_gauge}; + +pub(super) static DEFERRED_MIGRATIONS_APPLY_ERRORS: LazyCounter = lazy_counter!( + name: "deferred_migrations_apply_errors_total", + description: "Number of failed deferred PostgreSQL migration attempts.", + subsystem: "metastore", +); pub(super) static ACQUIRE_CONNECTIONS: LazyGauge = lazy_gauge!( name: "acquire_connections", diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs index d650c37b381..5fdc7e21645 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs @@ -14,12 +14,18 @@ use std::collections::BTreeMap; +use quickwit_common::get_bool_from_env; use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; use sqlx::migrate::{Migrate, Migrator}; use sqlx::{Acquire, PgConnection, Postgres}; use tracing::{error, info}; +use super::metrics::DEFERRED_MIGRATIONS_APPLY_ERRORS; use super::pool::TrackedPool; +use super::{ + QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY, QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, + QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, +}; // The deferred migrations should be attempted by only one metastore pod. We do that by having // metastore pods try to acquire a lock to run the deferred migrations. Pods that don't get the lock @@ -39,85 +45,149 @@ fn get_deferred_migrations() -> Migrator { sqlx::migrate!("migrations/postgresql_deferred") } -/// Initializes the database and runs the SQL migrations stored in the -/// `quickwit-metastore/migrations` directory. -/// -/// Runs on a raw pooled connection -- not wrapped in an outer transaction. -/// sqlx's `Migrator::run_direct` handles per-migration transactionality -/// itself, honoring each migration's `no_tx` flag (set by a -/// `-- no-transaction` directive as the first line of the migration file). -/// Wrapping the run in our own transaction would defeat that for migrations -/// that must execute outside a transaction block (e.g. `CREATE INDEX -/// CONCURRENTLY`). -/// -/// Atomicity is per-migration, not per-run: a failure on migration N leaves -/// migrations 1..N-1 applied and committed in `_sqlx_migrations`. The -/// operator fixes the failing migration and re-runs. -// #[instrument(skip_all)] -pub(super) async fn run_migrations( - pool: &TrackedPool, +enum DeferredOutcome { + LockNotAcquired, + Applied, + Failed, +} + +/// Runs the PostgreSQL metastore migrations: the required ones synchronously (gating readiness) +/// and, unless disabled, the deferred ones in a detached background task. +pub(super) struct Migrations { + connection_pool: TrackedPool, skip_migrations: bool, skip_locking: bool, -) -> MetastoreResult<()> { - let mut conn = pool.acquire().await?; - let mut migrator = get_migrations(); + skip_deferred: bool, +} - // ignore_missing will throw if any migrations applied to the DB are not present in the set of - // migrations it's given to run. Given that we also have deferred migrations, that will never be - // true. - migrator.set_ignore_missing(true); +impl Migrations { + pub(super) fn new(connection_pool: TrackedPool) -> Self { + Migrations { + connection_pool, + skip_migrations: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false), + skip_locking: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false), + skip_deferred: get_bool_from_env(QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY, false), + } + } - if skip_locking { - migrator.set_locking(false); + /// Runs the required migrations, then spawns the deferred migrations unless disabled. Deferred + /// election relies on advisory locks, so it is skipped whenever migration locking is. + pub(super) async fn run(&self) -> MetastoreResult<()> { + self.run_required(get_migrations()).await?; + + if !self.skip_migrations && !self.skip_locking && !self.skip_deferred { + self.spawn_deferred(); + } + Ok(()) } - if skip_migrations { - return check_migrations(migrator, &mut conn).await; + async fn run_required(&self, mut migrator: Migrator) -> MetastoreResult<()> { + let mut conn = self.connection_pool.acquire().await?; + + if self.skip_locking { + migrator.set_locking(false); + } + if self.skip_migrations { + return check_migrations(&migrator, &mut conn).await; + } + do_migrations(migrator, &mut conn).await } - // this is a hidden function, made to get "around the annoying "implementation of `Acquire` - // is not general enough" error", which is the error we get otherwise. - if let Err(migrate_error) = migrator.run_direct(&mut *conn).await { - error!(error=%migrate_error, "failed to run PostgreSQL migrations"); - return Err(MetastoreError::Internal { - message: "failed to run PostgreSQL migrations".to_string(), - cause: migrate_error.to_string(), - }); + /// Spawns the one-shot, detached deferred-migration attempt. Fire-and-forget: it self-terminates + /// after one attempt or when the process exits. A failed attempt is logged and counted (see + /// run_deferred); it is not retried until the next restart. + fn spawn_deferred(&self) { + let connection_pool = self.connection_pool.clone(); + quickwit_common::spawn_named_task( + async move { + match Self::run_deferred(&connection_pool, get_deferred_migrations()).await { + DeferredOutcome::Applied => info!("deferred PostgreSQL migrations applied successfully"), + DeferredOutcome::LockNotAcquired => { + info!("deferred PostgreSQL migrations handled by another node") + } + DeferredOutcome::Failed => {} + } + }, + "postgres_deferred_migrations", + ); } - Ok(()) -} -async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { - let dirty = match conn.dirty_version().await { - Ok(dirty) => dirty, - Err(migrate_error) => { - error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); + async fn run_deferred(pool: &TrackedPool, mut migrator: Migrator) -> DeferredOutcome { + let mut conn: PgConnection = match pool.acquire().await { + Ok(connection) => connection.detach(), + Err(error) => { + error!(%error, "failed to acquire connection for deferred PostgreSQL migrations"); + return DeferredOutcome::Failed; + } + }; - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: migrate_error.to_string(), - }); + // this query returns either true or false: we got the lock, or we didn't. It's dropped when + // the connection is dropped. + let acquired: bool = match sqlx::query_scalar("SELECT pg_try_advisory_lock($1, $2)") + .bind(DEFERRED_MIGRATIONS_LOCK_KEY_1) + .bind(DEFERRED_MIGRATIONS_LOCK_KEY_2) + .fetch_one(&mut conn) + .await + { + Ok(acquired) => acquired, + Err(error) => { + error!(%error, "failed to query deferred-migration advisory lock"); + return DeferredOutcome::Failed; + } + }; + if !acquired { + return DeferredOutcome::LockNotAcquired; } - }; - if let Some(dirty) = dirty { - error!("migration {dirty} is dirty"); - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: format!("migration {dirty} is dirty"), - }); - }; - let applied_migrations = match conn.list_applied_migrations().await { - Ok(applied_migrations) => applied_migrations, - Err(migrate_error) => { - error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); - - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: migrate_error.to_string(), - }); + // we've taken out our own lock; we don't need sqlx to also get one + migrator.set_locking(false); + match do_migrations(migrator, &mut conn).await { + Ok(()) => DeferredOutcome::Applied, + Err(_) => { + DEFERRED_MIGRATIONS_APPLY_ERRORS.inc(); + DeferredOutcome::Failed + } } - }; + } +} + +fn internal_error(message: &'static str, cause: impl std::fmt::Display) -> MetastoreError { + MetastoreError::Internal { + message: message.to_string(), + cause: cause.to_string(), + } +} + +/// Runs a migrator's pending up-migrations on `conn`. Shared by the required and deferred tracks. +/// +/// `ignore_missing` tolerates applied rows absent from this migrator's set: rows from the other +/// track, or migrations from a newer image after a rollback. +async fn do_migrations(mut migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { + migrator.set_ignore_missing(true); + // run_direct takes a Migrate connection directly, sidestepping run()'s Acquire bound that our + // pooled connection type doesn't satisfy. + migrator.run_direct(conn).await.map_err(|migrate_error| { + error!(error=%migrate_error, "failed to run PostgreSQL migrations"); + internal_error("failed to run PostgreSQL migrations", migrate_error) + }) +} + +async fn check_migrations(migrator: &Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { + let dirty = conn.dirty_version().await.map_err(|migrate_error| { + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); + internal_error("failed to validate PostgreSQL migrations", migrate_error) + })?; + if let Some(dirty) = dirty { + error!("migration {dirty} is dirty"); + return Err(internal_error( + "failed to validate PostgreSQL migrations", + format!("migration {dirty} is dirty"), + )); + } + let applied_migrations = conn.list_applied_migrations().await.map_err(|migrate_error| { + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); + internal_error("failed to validate PostgreSQL migrations", migrate_error) + })?; let applied_by_version: BTreeMap<_, _> = applied_migrations .iter() .map(|applied_migration| (applied_migration.version, applied_migration)) @@ -128,230 +198,266 @@ async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> Metast { let Some(applied_migration) = applied_by_version.get(&expected_migration.version) else { error!("missing required migration {}", expected_migration.version); - - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: format!("missing required migration {}", expected_migration.version), - }); + return Err(internal_error( + "failed to validate PostgreSQL migrations", + format!("missing required migration {}", expected_migration.version), + )); }; if expected_migration.checksum != applied_migration.checksum { error!( "migration {} differs between database and expected value", expected_migration.version ); - - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: format!( + return Err(internal_error( + "failed to validate PostgreSQL migrations", + format!( "migration {} differs between database and expected value", expected_migration.version ), - }); + )); } } Ok(()) } -enum DeferredOutcome { - LockNotAcquired, - Applied, -} - -pub(super) async fn run_deferred_migrations( - pool: &TrackedPool, - mut migrator: Migrator, -) -> MetastoreResult { - let mut conn: PgConnection = pool.acquire().await?.detach(); - - // this query returns either true or false: we got the lock, or we didn't. - let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_lock($1, $2)") - .bind(DEFERRED_MIGRATIONS_LOCK_KEY_1) - .bind(DEFERRED_MIGRATIONS_LOCK_KEY_2) - .fetch_one(&mut conn) - .await - .map_err(|error| MetastoreError::Internal { - message: "failed to query deferred-migration advisory lock".to_string(), - cause: error.to_string(), - })?; - if !acquired { - return Ok(DeferredOutcome::LockNotAcquired); - } - - // we've taken out our own lock; we don't need sqlx to also try to do so. - migrator.set_locking(false); - migrator.set_ignore_missing(true); - let run_result = migrator.run_direct(&mut conn).await; - - // dropping the detached connection closes the session, which also releases the lock - drop(conn); - - if let Err(migrate_error) = run_result { - error!(error=%migrate_error, "failed to run deferred PostgreSQL migrations"); - return Err(MetastoreError::Internal { - message: "failed to run deferred PostgreSQL migrations".to_string(), - cause: migrate_error.to_string(), - }); - } - Ok(DeferredOutcome::Applied) -} - -/// Spawns the one-shot, detached deferred-migration attempt. Fire-and-forget: -/// self-terminates after one attempt or when the process exits. -/// TODO: Figure out what to do here on error etc. -pub(super) fn spawn_deferred_migrations(pool: TrackedPool) { - quickwit_common::spawn_named_task( - async move { - match run_deferred_migrations(&pool, get_deferred_migrations()).await { - Ok(DeferredOutcome::Applied) => info!("deferred PostgreSQL migrations applied"), - Ok(DeferredOutcome::LockNotAcquired) => { - info!("deferred PostgreSQL migrations handled by another node") - } - Err(error) => error!(%error, "deferred PostgreSQL migrations failed"), - } - }, - "postgres_deferred_migrations", - ); -} - #[cfg(test)] mod tests { use std::time::Duration; use quickwit_common::uri::Uri; - use sqlx::Acquire; use sqlx::migrate::{Migrate, Migrator}; + use sqlx::{Acquire, Postgres}; - use super::{DeferredOutcome, get_migrations, run_deferred_migrations, run_migrations}; + use super::{DeferredOutcome, Migrations, TrackedPool}; use crate::metastore::postgres::utils::establish_connection; - #[tokio::test] - #[serial_test::file_serial] - async fn test_metastore_check_migration() { - let _ = tracing_subscriber::fmt::try_init(); - + fn test_uri() -> Uri { dotenvy::dotenv().ok(); - let uri: Uri = std::env::var("QW_TEST_DATABASE_URL") + std::env::var("QW_TEST_DATABASE_URL") .expect("environment variable `QW_TEST_DATABASE_URL` should be set") .parse() - .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI"); + .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI") + } - { - let connection_pool = - establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, false) - .await - .unwrap(); - // make sure migrations are run - run_migrations(&connection_pool, false, false) + async fn test_pool(read_only: bool) -> TrackedPool { + establish_connection(&test_uri(), 1, 5, Duration::from_secs(2), None, None, read_only) + .await + .unwrap() + } + + fn migrations(connection_pool: &TrackedPool, skip_migrations: bool) -> Migrations { + Migrations { + connection_pool: connection_pool.clone(), + skip_migrations, + skip_locking: false, + skip_deferred: false, + } + } + + // A single controlled migration so these tests don't depend on the production migration set. + // Migrator::new loads the SQL into memory, so the tempdir can be dropped afterward. + async fn test_migrator() -> Migrator { + let migrations_dir = tempfile::tempdir().unwrap(); + std::fs::write( + migrations_dir.path().join("90000_test.up.sql"), + "CREATE TABLE IF NOT EXISTS qw_test_migration_marker (id INT)", + ) + .unwrap(); + std::fs::write( + migrations_dir.path().join("90000_test.down.sql"), + "DROP TABLE IF EXISTS qw_test_migration_marker", + ) + .unwrap(); + Migrator::new(migrations_dir.path()).await.unwrap() + } + + // Removes a bookkeeping row, tolerating a fresh database where the table doesn't exist yet. + async fn delete_migration_row(connection_pool: &TrackedPool, version: i64) { + let migrations_table_exists: bool = + sqlx::query_scalar("SELECT to_regclass('_sqlx_migrations') IS NOT NULL") + .fetch_one(connection_pool) + .await + .unwrap(); + if migrations_table_exists { + sqlx::query("DELETE FROM _sqlx_migrations WHERE version = $1") + .bind(version) + .execute(connection_pool) .await .unwrap(); + } + } - // we just ran migration, nothing else to run - run_migrations(&connection_pool, true, false).await.unwrap(); + // Pre-clean so run_required has to (re)apply the test migration from scratch. + async fn reset_test_migration(connection_pool: &TrackedPool) { + sqlx::query("DROP TABLE IF EXISTS qw_test_migration_marker") + .execute(connection_pool) + .await + .unwrap(); + delete_migration_row(connection_pool, 90000).await; + } - // an unknown high-version row (a deferred migration, or a newer rolled-back image) is - // tolerated: the required-track check only asserts required migrations are present - sqlx::query( - "INSERT INTO _sqlx_migrations (version, description, success, checksum, \ - execution_time) VALUES (999999, 'tolerance test row', true, '\\x00'::bytea, 0)", - ) - .execute(&connection_pool) + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_applies_migrations() { + let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(false).await; + reset_test_migration(&pool).await; + + migrations(&pool, false) + .run_required(test_migrator().await) .await .unwrap(); - run_migrations(&connection_pool, true, false).await.unwrap(); - sqlx::query("DELETE FROM _sqlx_migrations WHERE version = 999999") - .execute(&connection_pool) - .await - .unwrap(); - let migrations = get_migrations(); - let last_migration = migrations - .iter() - .map(|migration| migration.version) - .max() - .expect("no migration exists?"); - let up_migration = migrations - .iter() - .find(|migration| { - migration.version == last_migration - && migration.migration_type.is_up_migration() - }) - .unwrap(); - let down_migration = migrations - .iter() - .find(|migration| { - migration.version == last_migration - && migration.migration_type.is_down_migration() - }) - .unwrap(); - let mut conn = connection_pool.acquire().await.unwrap(); + // the migration actually ran: its marker table now exists (we dropped it above, so this + // proves run_required recreated it) + sqlx::query("SELECT 1 FROM qw_test_migration_marker LIMIT 1") + .fetch_optional(&pool) + .await + .unwrap(); + } - conn.revert(down_migration).await.unwrap(); + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_validates_applied_migrations() { + let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(false).await; + reset_test_migration(&pool).await; + migrations(&pool, false) + .run_required(test_migrator().await) + .await + .unwrap(); + // nothing left to run; validation passes + migrations(&pool, true) + .run_required(test_migrator().await) + .await + .unwrap(); + } - run_migrations(&connection_pool, true, false) - .await - .unwrap_err(); + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_tolerates_unknown_migration() { + let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(false).await; + reset_test_migration(&pool).await; + migrations(&pool, false) + .run_required(test_migrator().await) + .await + .unwrap(); - conn.apply(up_migration).await.unwrap(); - } + // an unknown high-version row (a deferred migration, or a newer rolled-back image) is + // tolerated: the required-track check only asserts required migrations are present + sqlx::query( + "INSERT INTO _sqlx_migrations (version, description, success, checksum, \ + execution_time) VALUES (999999, 'tolerance test row', true, '\\x00'::bytea, 0)", + ) + .execute(&pool) + .await + .unwrap(); + let result = migrations(&pool, true) + .run_required(test_migrator().await) + .await; + sqlx::query("DELETE FROM _sqlx_migrations WHERE version = 999999") + .execute(&pool) + .await + .unwrap(); + result.unwrap(); + } - { - let connection_pool = - establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, true) - .await - .unwrap(); - // error because we are in read only mode, and we try to run migrations - run_migrations(&connection_pool, false, false) - .await - .unwrap_err(); - // okay because all migrations were already run before - run_migrations(&connection_pool, true, false).await.unwrap(); - } + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_fails_on_missing_required_migration() { + let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(false).await; + reset_test_migration(&pool).await; + migrations(&pool, false) + .run_required(test_migrator().await) + .await + .unwrap(); + + // revert the only migration so validation finds it missing + let migrator = test_migrator().await; + let down_migration = migrator + .iter() + .find(|migration| migration.migration_type.is_down_migration()) + .unwrap(); + let mut conn = pool.acquire().await.unwrap(); + conn.revert(down_migration).await.unwrap(); + + let result = migrations(&pool, true) + .run_required(test_migrator().await) + .await; + reset_test_migration(&pool).await; + result.unwrap_err(); } #[tokio::test] #[serial_test::file_serial] - async fn test_deferred_migrations_elect_single_runner() { + async fn test_run_required_apply_fails_when_read_only() { let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(true).await; + // writing migrations fails in read-only mode + migrations(&pool, false) + .run_required(test_migrator().await) + .await + .unwrap_err(); + } - dotenvy::dotenv().ok(); - let uri: Uri = std::env::var("QW_TEST_DATABASE_URL") - .expect("environment variable `QW_TEST_DATABASE_URL` should be set") - .parse() - .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI"); + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_validates_when_read_only() { + let _ = tracing_subscriber::fmt::try_init(); + let writable_pool = test_pool(false).await; + reset_test_migration(&writable_pool).await; + // apply the test migration first, using a writable connection + migrations(&writable_pool, false) + .run_required(test_migrator().await) + .await + .unwrap(); + // validation only reads, so it succeeds even when read-only + migrations(&test_pool(true).await, true) + .run_required(test_migrator().await) + .await + .unwrap(); + reset_test_migration(&writable_pool).await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_deferred_migrations_elect_single_runner() { + let _ = tracing_subscriber::fmt::try_init(); let connection_pool = - establish_connection(&uri, 2, 5, Duration::from_secs(5), None, None, false) + establish_connection(&test_uri(), 2, 5, Duration::from_secs(5), None, None, false) .await .unwrap(); - // in case a previous attempt ran before us - sqlx::query("DELETE FROM _sqlx_migrations WHERE version = 90001") + // in case a previous attempt left state behind + sqlx::query("DROP TABLE IF EXISTS qw_test_deferred_marker") .execute(&connection_pool) .await .unwrap(); + delete_migration_row(&connection_pool, 90001).await; - // keeps the lock winner busy while the other node tries to acquire it + // the migration creates a marker table (to prove it ran) and sleeps (to keep the lock + // winner busy while the other node tries to acquire the lock) let migrations_dir = tempfile::tempdir().unwrap(); std::fs::write( migrations_dir.path().join("90001_slow_marker.up.sql"), - "SELECT pg_sleep(1)", + "CREATE TABLE IF NOT EXISTS qw_test_deferred_marker (id INT); SELECT pg_sleep(1)", ) .unwrap(); let (result_1, result_2) = tokio::join!( - run_deferred_migrations( + Migrations::run_deferred( &connection_pool, Migrator::new(migrations_dir.path()).await.unwrap() ), - run_deferred_migrations( + Migrations::run_deferred( &connection_pool, Migrator::new(migrations_dir.path()).await.unwrap() ), ); - let outcomes = [ - result_1.expect("first deferred run should not error"), - result_2.expect("second deferred run should not error"), - ]; + let outcomes = [result_1, result_2]; let applied = outcomes .iter() .filter(|outcome| matches!(outcome, DeferredOutcome::Applied)) @@ -362,5 +468,11 @@ mod tests { .count(); assert_eq!(applied, 1, "exactly one runner should win the lock and apply"); assert_eq!(not_acquired, 1, "the other runner should not acquire the lock"); + + // the winning runner actually applied the migration: its marker table now exists + sqlx::query("SELECT 1 FROM qw_test_deferred_marker LIMIT 1") + .fetch_optional(&connection_pool) + .await + .unwrap(); } } From 9b95962d4ae2072d31f1ce863913e65243031132 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 2 Jul 2026 08:27:47 -0400 Subject: [PATCH 3/4] comments, lints --- .../src/metastore/postgres/metastore.rs | 2 +- .../src/metastore/postgres/migrator.rs | 68 ++++++++++++------- 2 files changed, 44 insertions(+), 26 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index ef4f6bd5230..b06aa11a792 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -61,6 +61,7 @@ use time::OffsetDateTime; use tracing::{debug, info, instrument, warn}; use uuid::Uuid; +use super::QW_POSTGRES_READ_ONLY_ENV_KEY; use super::error::convert_sqlx_err; use super::migrator::Migrations; use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits}; @@ -68,7 +69,6 @@ use super::parquet_model::{InsertableParquetSplit, ParquetSplitRecord, PgParquet use super::pool::TrackedPool; use super::split_stream::SplitStream; use super::utils::{append_query_filters_and_order_by, establish_connection}; -use super::QW_POSTGRES_READ_ONLY_ENV_KEY; use crate::checkpoint::{ IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta, }; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs index 5fdc7e21645..fc55e1f2387 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs @@ -32,8 +32,8 @@ use super::{ // ignore deferred migrations. The locks are implemented using Postgres advisory locks. // // Advisory locks are Postgres locks on custom defined resources, represented by a key of two ints. -// These two ints are arbitrary magic (but stable) numbers that make up the lock for running deferred -// migrations. +// These two ints are arbitrary magic (but stable) numbers that make up the key for the deferred +// migrations lock. const DEFERRED_MIGRATIONS_LOCK_KEY_1: i32 = 424242; const DEFERRED_MIGRATIONS_LOCK_KEY_2: i32 = 1; @@ -51,8 +51,6 @@ enum DeferredOutcome { Failed, } -/// Runs the PostgreSQL metastore migrations: the required ones synchronously (gating readiness) -/// and, unless disabled, the deferred ones in a detached background task. pub(super) struct Migrations { connection_pool: TrackedPool, skip_migrations: bool, @@ -70,8 +68,7 @@ impl Migrations { } } - /// Runs the required migrations, then spawns the deferred migrations unless disabled. Deferred - /// election relies on advisory locks, so it is skipped whenever migration locking is. + /// Runs the required migrations, and tries to also run the deferred migrations. pub(super) async fn run(&self) -> MetastoreResult<()> { self.run_required(get_migrations()).await?; @@ -81,6 +78,7 @@ impl Migrations { Ok(()) } + /// Required migrations are lightweight and critical to the system, and must be applied. async fn run_required(&self, mut migrator: Migrator) -> MetastoreResult<()> { let mut conn = self.connection_pool.acquire().await?; @@ -93,15 +91,16 @@ impl Migrations { do_migrations(migrator, &mut conn).await } - /// Spawns the one-shot, detached deferred-migration attempt. Fire-and-forget: it self-terminates - /// after one attempt or when the process exits. A failed attempt is logged and counted (see - /// run_deferred); it is not retried until the next restart. + /// Spawns the task to apply deferred migrations. A failure in applying these migrations is not + /// fatal. fn spawn_deferred(&self) { let connection_pool = self.connection_pool.clone(); quickwit_common::spawn_named_task( async move { match Self::run_deferred(&connection_pool, get_deferred_migrations()).await { - DeferredOutcome::Applied => info!("deferred PostgreSQL migrations applied successfully"), + DeferredOutcome::Applied => { + info!("deferred PostgreSQL migrations applied successfully") + } DeferredOutcome::LockNotAcquired => { info!("deferred PostgreSQL migrations handled by another node") } @@ -112,6 +111,9 @@ impl Migrations { ); } + /// Apply the deferred migrations. We try to get the lock; if we do, we're the migration leader + /// and attempt to apply it. If we didn't, that means another pod did, and we can go about our + /// day. async fn run_deferred(pool: &TrackedPool, mut migrator: Migrator) -> DeferredOutcome { let mut conn: PgConnection = match pool.acquire().await { Ok(connection) => connection.detach(), @@ -143,7 +145,8 @@ impl Migrations { migrator.set_locking(false); match do_migrations(migrator, &mut conn).await { Ok(()) => DeferredOutcome::Applied, - Err(_) => { + Err(error) => { + error!(%error, "failed to apply deferred migrations"); DEFERRED_MIGRATIONS_APPLY_ERRORS.inc(); DeferredOutcome::Failed } @@ -158,10 +161,9 @@ fn internal_error(message: &'static str, cause: impl std::fmt::Display) -> Metas } } -/// Runs a migrator's pending up-migrations on `conn`. Shared by the required and deferred tracks. -/// -/// `ignore_missing` tolerates applied rows absent from this migrator's set: rows from the other -/// track, or migrations from a newer image after a rollback. +/// Runs a migrator's pending up-migrations on `conn`. +/// `ignore_missing` is how we are able to ensure forwards/backwards compatibility - we don't error +/// in the case a migration in the db is not present in our migration set. async fn do_migrations(mut migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { migrator.set_ignore_missing(true); // run_direct takes a Migrate connection directly, sidestepping run()'s Acquire bound that our @@ -184,10 +186,13 @@ async fn check_migrations(migrator: &Migrator, conn: &mut PgConnection) -> Metas format!("migration {dirty} is dirty"), )); } - let applied_migrations = conn.list_applied_migrations().await.map_err(|migrate_error| { - error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); - internal_error("failed to validate PostgreSQL migrations", migrate_error) - })?; + let applied_migrations = conn + .list_applied_migrations() + .await + .map_err(|migrate_error| { + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); + internal_error("failed to validate PostgreSQL migrations", migrate_error) + })?; let applied_by_version: BTreeMap<_, _> = applied_migrations .iter() .map(|applied_migration| (applied_migration.version, applied_migration)) @@ -240,9 +245,17 @@ mod tests { } async fn test_pool(read_only: bool) -> TrackedPool { - establish_connection(&test_uri(), 1, 5, Duration::from_secs(2), None, None, read_only) - .await - .unwrap() + establish_connection( + &test_uri(), + 1, + 5, + Duration::from_secs(2), + None, + None, + read_only, + ) + .await + .unwrap() } fn migrations(connection_pool: &TrackedPool, skip_migrations: bool) -> Migrations { @@ -287,7 +300,6 @@ mod tests { } } - // Pre-clean so run_required has to (re)apply the test migration from scratch. async fn reset_test_migration(connection_pool: &TrackedPool) { sqlx::query("DROP TABLE IF EXISTS qw_test_migration_marker") .execute(connection_pool) @@ -466,8 +478,14 @@ mod tests { .iter() .filter(|outcome| matches!(outcome, DeferredOutcome::LockNotAcquired)) .count(); - assert_eq!(applied, 1, "exactly one runner should win the lock and apply"); - assert_eq!(not_acquired, 1, "the other runner should not acquire the lock"); + assert_eq!( + applied, 1, + "exactly one runner should win the lock and apply" + ); + assert_eq!( + not_acquired, 1, + "the other runner should not acquire the lock" + ); // the winning runner actually applied the migration: its marker table now exists sqlx::query("SELECT 1 FROM qw_test_deferred_marker LIMIT 1") From 50775793dd5aba7a5af216aee7b22076ea64da67 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 2 Jul 2026 11:22:48 -0400 Subject: [PATCH 4/4] PR comments --- quickwit/Makefile | 3 +- .../migrations/postgresql_deferred/README.md | 17 +++- .../src/metastore/postgres/metastore.rs | 2 +- .../src/metastore/postgres/metrics.rs | 7 +- .../src/metastore/postgres/migrator.rs | 92 ++++++++++--------- 5 files changed, 71 insertions(+), 50 deletions(-) diff --git a/quickwit/Makefile b/quickwit/Makefile index c4f3b4a6531..f555caa7981 100644 --- a/quickwit/Makefile +++ b/quickwit/Makefile @@ -28,8 +28,7 @@ migrations-lint: @echo "Linting Postgres migrations with diesel-guard" @(command -v diesel-guard >/dev/null || (echo "diesel-guard is not installed. Please install using 'cargo install diesel-guard'." && exit 1)) @diesel-guard check quickwit-metastore/migrations/postgresql/ - @if ls quickwit-metastore/migrations/postgresql_deferred/*.sql >/dev/null 2>&1; then \ - diesel-guard check quickwit-metastore/migrations/postgresql_deferred/; fi + @diesel-guard check quickwit-metastore/migrations/postgresql_deferred/ unused-deps: @echo "Checking for unused dependencies" diff --git a/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md b/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md index dd72e838905..128fb27b0aa 100644 --- a/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md +++ b/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md @@ -3,5 +3,20 @@ Long, degrade-gracefully-only migrations (e.g. `CREATE INDEX CONCURRENTLY`) run in a background task after readiness. Both tracks share the `_sqlx_migrations` table. - Version must be globally unique across both dirs (continue the single sequence). -- Must be idempotent: `-- no-transaction` first line, then `DROP INDEX CONCURRENTLY IF EXISTS ...; CREATE INDEX CONCURRENTLY IF NOT EXISTS ...`. +- Must be idempotent. A statement that can't run in a transaction (e.g. `CREATE INDEX CONCURRENTLY`) needs `-- no-transaction` as the first line, then the DDL. Each statement auto-commits, so a killed migration must be safe to re-run (`CREATE INDEX CONCURRENTLY` can leave an invalid index, so drop it first). For example: + +29_add_foo_index.up.sql + ```sql + -- no-transaction + DROP INDEX CONCURRENTLY IF EXISTS foo_idx; + CREATE INDEX CONCURRENTLY IF NOT EXISTS foo_idx ON foo (bar); + ``` + +29_add_foo_index.down.sql + ```sql + -- no-transaction + DROP INDEX CONCURRENTLY IF EXISTS foo_idx; + ``` + + The `DROP` before the `CREATE` looks like it would drop the index on every run, but it does not: sqlx applies each migration at most once (tracked by version in `_sqlx_migrations`) and never re-runs one that already succeeded. The drop only matters on a retry after a partial failure. If a `CREATE INDEX CONCURRENTLY` is killed midway, Postgres leaves an *invalid* index and sqlx records nothing (the bookkeeping row is written only on success), so the next attempt re-runs the migration. A bare `CREATE INDEX CONCURRENTLY IF NOT EXISTS` would then see that invalid index and skip it forever, so we drop it first to force a clean rebuild. - Never depend on an unshipped required migration; required migrations must never depend on a deferred one. \ No newline at end of file diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index b06aa11a792..62d0d85fe12 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -132,7 +132,7 @@ impl PostgresqlMetastore { ) .await?; - Migrations::new(connection_pool.clone()).run().await?; + Migrations::from_env(connection_pool.clone()).run().await?; let metastore = PostgresqlMetastore { uri: connection_uri.clone(), diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs index c7bbebeba86..b264632ca64 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs @@ -14,9 +14,10 @@ use quickwit_metrics::{LazyCounter, LazyGauge, lazy_counter, lazy_gauge}; -pub(super) static DEFERRED_MIGRATIONS_APPLY_ERRORS: LazyCounter = lazy_counter!( - name: "deferred_migrations_apply_errors_total", - description: "Number of failed deferred PostgreSQL migration attempts.", +// Counts deferred-migration apply attempts, labeled by `result` ("success"/"failure"). +pub(super) static DEFERRED_MIGRATIONS_APPLY: LazyCounter = lazy_counter!( + name: "deferred_migrations_apply_total", + description: "Number of deferred PostgreSQL migration attempts, by result.", subsystem: "metastore", ); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs index fc55e1f2387..342973483ef 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs @@ -15,12 +15,13 @@ use std::collections::BTreeMap; use quickwit_common::get_bool_from_env; +use quickwit_metrics::{counter, labels}; use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; use sqlx::migrate::{Migrate, Migrator}; use sqlx::{Acquire, PgConnection, Postgres}; use tracing::{error, info}; -use super::metrics::DEFERRED_MIGRATIONS_APPLY_ERRORS; +use super::metrics::DEFERRED_MIGRATIONS_APPLY; use super::pool::TrackedPool; use super::{ QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY, QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, @@ -35,7 +36,7 @@ use super::{ // These two ints are arbitrary magic (but stable) numbers that make up the key for the deferred // migrations lock. const DEFERRED_MIGRATIONS_LOCK_KEY_1: i32 = 424242; -const DEFERRED_MIGRATIONS_LOCK_KEY_2: i32 = 1; +const DEFERRED_MIGRATIONS_LOCK_KEY_2: i32 = 1789; fn get_migrations() -> Migrator { sqlx::migrate!("migrations/postgresql") @@ -47,8 +48,8 @@ fn get_deferred_migrations() -> Migrator { enum DeferredOutcome { LockNotAcquired, - Applied, - Failed, + Success, + Failure, } pub(super) struct Migrations { @@ -59,8 +60,8 @@ pub(super) struct Migrations { } impl Migrations { - pub(super) fn new(connection_pool: TrackedPool) -> Self { - Migrations { + pub(super) fn from_env(connection_pool: TrackedPool) -> Self { + Self { connection_pool, skip_migrations: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false), skip_locking: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false), @@ -73,7 +74,13 @@ impl Migrations { self.run_required(get_migrations()).await?; if !self.skip_migrations && !self.skip_locking && !self.skip_deferred { - self.spawn_deferred(); + let connection_pool = self.connection_pool.clone(); + quickwit_common::spawn_named_task( + async move { + Self::run_deferred(&connection_pool, get_deferred_migrations()).await; + }, + "postgres_deferred_migrations", + ); } Ok(()) } @@ -88,27 +95,7 @@ impl Migrations { if self.skip_migrations { return check_migrations(&migrator, &mut conn).await; } - do_migrations(migrator, &mut conn).await - } - - /// Spawns the task to apply deferred migrations. A failure in applying these migrations is not - /// fatal. - fn spawn_deferred(&self) { - let connection_pool = self.connection_pool.clone(); - quickwit_common::spawn_named_task( - async move { - match Self::run_deferred(&connection_pool, get_deferred_migrations()).await { - DeferredOutcome::Applied => { - info!("deferred PostgreSQL migrations applied successfully") - } - DeferredOutcome::LockNotAcquired => { - info!("deferred PostgreSQL migrations handled by another node") - } - DeferredOutcome::Failed => {} - } - }, - "postgres_deferred_migrations", - ); + run_migrations(migrator, &mut conn).await } /// Apply the deferred migrations. We try to get the lock; if we do, we're the migration leader @@ -116,14 +103,17 @@ impl Migrations { /// day. async fn run_deferred(pool: &TrackedPool, mut migrator: Migrator) -> DeferredOutcome { let mut conn: PgConnection = match pool.acquire().await { + // Postgres will automatically unlock the advisory lock on a session end/conn drop. + // Detaching the connection scopes it to this one session, which guarantees Postgres + // will automatically unlock for us, including on failure/panic. Ok(connection) => connection.detach(), Err(error) => { error!(%error, "failed to acquire connection for deferred PostgreSQL migrations"); - return DeferredOutcome::Failed; + return DeferredOutcome::Failure; } }; - // this query returns either true or false: we got the lock, or we didn't. It's dropped when + // This query returns either true or false: we got the lock, or we didn't. It's dropped when // the connection is dropped. let acquired: bool = match sqlx::query_scalar("SELECT pg_try_advisory_lock($1, $2)") .bind(DEFERRED_MIGRATIONS_LOCK_KEY_1) @@ -134,50 +124,63 @@ impl Migrations { Ok(acquired) => acquired, Err(error) => { error!(%error, "failed to query deferred-migration advisory lock"); - return DeferredOutcome::Failed; + return DeferredOutcome::Failure; } }; if !acquired { + info!("deferred PostgreSQL migrations handled by another node"); return DeferredOutcome::LockNotAcquired; } - // we've taken out our own lock; we don't need sqlx to also get one + // We've taken out our own lock; we don't need sqlx to also get one migrator.set_locking(false); - match do_migrations(migrator, &mut conn).await { - Ok(()) => DeferredOutcome::Applied, + match run_migrations(migrator, &mut conn).await { + Ok(()) => { + info!("deferred PostgreSQL migrations applied successfully"); + counter!(parent: DEFERRED_MIGRATIONS_APPLY, labels: [labels!("result" => "success")]) + .inc(); + DeferredOutcome::Success + } Err(error) => { error!(%error, "failed to apply deferred migrations"); - DEFERRED_MIGRATIONS_APPLY_ERRORS.inc(); - DeferredOutcome::Failed + counter!(parent: DEFERRED_MIGRATIONS_APPLY, labels: [labels!("result" => "failure")]) + .inc(); + DeferredOutcome::Failure } } } } -fn internal_error(message: &'static str, cause: impl std::fmt::Display) -> MetastoreError { +fn internal_error(message: &'static str, cause: String) -> MetastoreError { MetastoreError::Internal { message: message.to_string(), - cause: cause.to_string(), + cause, } } /// Runs a migrator's pending up-migrations on `conn`. /// `ignore_missing` is how we are able to ensure forwards/backwards compatibility - we don't error /// in the case a migration in the db is not present in our migration set. -async fn do_migrations(mut migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { +async fn run_migrations(mut migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { migrator.set_ignore_missing(true); // run_direct takes a Migrate connection directly, sidestepping run()'s Acquire bound that our // pooled connection type doesn't satisfy. migrator.run_direct(conn).await.map_err(|migrate_error| { error!(error=%migrate_error, "failed to run PostgreSQL migrations"); - internal_error("failed to run PostgreSQL migrations", migrate_error) + internal_error( + "failed to run PostgreSQL migrations", + migrate_error.to_string(), + ) }) } async fn check_migrations(migrator: &Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { let dirty = conn.dirty_version().await.map_err(|migrate_error| { error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); - internal_error("failed to validate PostgreSQL migrations", migrate_error) + internal_error( + "failed to validate PostgreSQL migrations", + migrate_error.to_string(), + ) })?; if let Some(dirty) = dirty { error!("migration {dirty} is dirty"); @@ -191,7 +194,10 @@ async fn check_migrations(migrator: &Migrator, conn: &mut PgConnection) -> Metas .await .map_err(|migrate_error| { error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); - internal_error("failed to validate PostgreSQL migrations", migrate_error) + internal_error( + "failed to validate PostgreSQL migrations", + migrate_error.to_string(), + ) })?; let applied_by_version: BTreeMap<_, _> = applied_migrations .iter() @@ -472,7 +478,7 @@ mod tests { let outcomes = [result_1, result_2]; let applied = outcomes .iter() - .filter(|outcome| matches!(outcome, DeferredOutcome::Applied)) + .filter(|outcome| matches!(outcome, DeferredOutcome::Success)) .count(); let not_acquired = outcomes .iter()