diff --git a/quickwit/Makefile b/quickwit/Makefile index e09001ffe0b..f555caa7981 100644 --- a/quickwit/Makefile +++ b/quickwit/Makefile @@ -28,6 +28,7 @@ migrations-lint: @echo "Linting Postgres migrations with diesel-guard" @(command -v diesel-guard >/dev/null || (echo "diesel-guard is not installed. Please install using 'cargo install diesel-guard'." && exit 1)) @diesel-guard check quickwit-metastore/migrations/postgresql/ + @diesel-guard check quickwit-metastore/migrations/postgresql_deferred/ unused-deps: @echo "Checking for unused dependencies" diff --git a/quickwit/quickwit-metastore/README.md b/quickwit/quickwit-metastore/README.md index 0743ec9d66c..2af73ddeda2 100644 --- a/quickwit/quickwit-metastore/README.md +++ b/quickwit/quickwit-metastore/README.md @@ -53,3 +53,13 @@ You can then use the following commands to apply/revert your postgresql migratio sqlx migrate run --database-url postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev --source migrations/postgresql sqlx migrate revert --database-url postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev --source migrations/postgresql ``` + +## Deferred migrations + +`migrations/postgresql_deferred` holds long-running, degrade-gracefully migrations (e.g. `CREATE INDEX CONCURRENTLY`). +They run in a background task after readiness, elected across pods by a Postgres advisory lock, and share the +`_sqlx_migrations` table with the required track, so version numbers must be globally unique across both dirs and each +migration must be idempotent. See `migrations/postgresql_deferred/README.md` for authoring rules. +``` +sqlx migrate run --database-url postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev --source migrations/postgresql_deferred +``` diff --git a/quickwit/quickwit-metastore/build.rs b/quickwit/quickwit-metastore/build.rs index c1c9cc14375..889a77a6f99 100644 --- a/quickwit/quickwit-metastore/build.rs +++ b/quickwit/quickwit-metastore/build.rs @@ -14,4 +14,5 @@ fn main() { println!("cargo:rerun-if-changed=migrations/postgresql"); + println!("cargo:rerun-if-changed=migrations/postgresql_deferred"); } diff --git a/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md b/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md new file mode 100644 index 00000000000..128fb27b0aa --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql_deferred/README.md @@ -0,0 +1,22 @@ +# Deferred PostgreSQL migrations + +Long, degrade-gracefully-only migrations (e.g. `CREATE INDEX CONCURRENTLY`) run in a background task after readiness. Both tracks share the `_sqlx_migrations` table. + +- Version must be globally unique across both dirs (continue the single sequence). +- Must be idempotent. A statement that can't run in a transaction (e.g. `CREATE INDEX CONCURRENTLY`) needs `-- no-transaction` as the first line, then the DDL. Each statement auto-commits, so a killed migration must be safe to re-run (`CREATE INDEX CONCURRENTLY` can leave an invalid index, so drop it first). For example: + +29_add_foo_index.up.sql + ```sql + -- no-transaction + DROP INDEX CONCURRENTLY IF EXISTS foo_idx; + CREATE INDEX CONCURRENTLY IF NOT EXISTS foo_idx ON foo (bar); + ``` + +29_add_foo_index.down.sql + ```sql + -- no-transaction + DROP INDEX CONCURRENTLY IF EXISTS foo_idx; + ``` + + The `DROP` before the `CREATE` looks like it would drop the index on every run, but it does not: sqlx applies each migration at most once (tracked by version in `_sqlx_migrations`) and never re-runs one that already succeeded. The drop only matters on a retry after a partial failure. If a `CREATE INDEX CONCURRENTLY` is killed midway, Postgres leaves an *invalid* index and sqlx records nothing (the bookkeeping row is written only on success), so the next attempt re-runs the migration. A bare `CREATE INDEX CONCURRENTLY IF NOT EXISTS` would then see that invalid index and skip it forever, so we drop it first to force a clean rebuild. +- Never depend on an unshipped required migration; required migrations must never depend on a deferred one. \ No newline at end of file diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index de080bfcc4e..62d0d85fe12 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -61,17 +61,14 @@ use time::OffsetDateTime; use tracing::{debug, info, instrument, warn}; use uuid::Uuid; +use super::QW_POSTGRES_READ_ONLY_ENV_KEY; use super::error::convert_sqlx_err; -use super::migrator::run_migrations; +use super::migrator::Migrations; use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits}; use super::parquet_model::{InsertableParquetSplit, ParquetSplitRecord, PgParquetSplit}; use super::pool::TrackedPool; use super::split_stream::SplitStream; use super::utils::{append_query_filters_and_order_by, establish_connection}; -use super::{ - QW_POSTGRES_READ_ONLY_ENV_KEY, QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, - QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, -}; use crate::checkpoint::{ IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta, }; @@ -123,8 +120,6 @@ impl PostgresqlMetastore { .expect("PostgreSQL metastore config should have been validated"); let read_only = get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false); - let skip_migrations = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false); - let skip_locking = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false); let connection_pool = establish_connection( connection_uri, @@ -137,7 +132,7 @@ impl PostgresqlMetastore { ) .await?; - run_migrations(&connection_pool, skip_migrations, skip_locking).await?; + Migrations::from_env(connection_pool.clone()).run().await?; let metastore = PostgresqlMetastore { uri: connection_uri.clone(), diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs index b7647d4320c..b264632ca64 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs @@ -12,7 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use quickwit_metrics::{LazyGauge, lazy_gauge}; +use quickwit_metrics::{LazyCounter, LazyGauge, lazy_counter, lazy_gauge}; + +// Counts deferred-migration apply attempts, labeled by `result` ("success"/"failure"). +pub(super) static DEFERRED_MIGRATIONS_APPLY: LazyCounter = lazy_counter!( + name: "deferred_migrations_apply_total", + description: "Number of deferred PostgreSQL migration attempts, by result.", + subsystem: "metastore", +); pub(super) static ACQUIRE_CONNECTIONS: LazyGauge = lazy_gauge!( name: "acquire_connections", diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs index 03c40eec666..342973483ef 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs @@ -14,135 +14,218 @@ use std::collections::BTreeMap; +use quickwit_common::get_bool_from_env; +use quickwit_metrics::{counter, labels}; use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; use sqlx::migrate::{Migrate, Migrator}; use sqlx::{Acquire, PgConnection, Postgres}; -use tracing::{error, instrument}; +use tracing::{error, info}; +use super::metrics::DEFERRED_MIGRATIONS_APPLY; use super::pool::TrackedPool; +use super::{ + QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY, QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, + QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, +}; + +// The deferred migrations should be attempted by only one metastore pod. We do that by having +// metastore pods try to acquire a lock to run the deferred migrations. Pods that don't get the lock +// ignore deferred migrations. The locks are implemented using Postgres advisory locks. +// +// Advisory locks are Postgres locks on custom defined resources, represented by a key of two ints. +// These two ints are arbitrary magic (but stable) numbers that make up the key for the deferred +// migrations lock. +const DEFERRED_MIGRATIONS_LOCK_KEY_1: i32 = 424242; +const DEFERRED_MIGRATIONS_LOCK_KEY_2: i32 = 1789; fn get_migrations() -> Migrator { sqlx::migrate!("migrations/postgresql") } -/// Initializes the database and runs the SQL migrations stored in the -/// `quickwit-metastore/migrations` directory. -#[instrument(skip_all)] -pub(super) async fn run_migrations( - pool: &TrackedPool, +fn get_deferred_migrations() -> Migrator { + sqlx::migrate!("migrations/postgresql_deferred") +} + +enum DeferredOutcome { + LockNotAcquired, + Success, + Failure, +} + +pub(super) struct Migrations { + connection_pool: TrackedPool, skip_migrations: bool, skip_locking: bool, -) -> MetastoreResult<()> { - let mut tx = pool.begin().await?; - let conn = tx.acquire().await?; + skip_deferred: bool, +} - let mut migrator = get_migrations(); +impl Migrations { + pub(super) fn from_env(connection_pool: TrackedPool) -> Self { + Self { + connection_pool, + skip_migrations: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false), + skip_locking: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false), + skip_deferred: get_bool_from_env(QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY, false), + } + } - if skip_locking { - migrator.set_locking(false); + /// Runs the required migrations, and tries to also run the deferred migrations. + pub(super) async fn run(&self) -> MetastoreResult<()> { + self.run_required(get_migrations()).await?; + + if !self.skip_migrations && !self.skip_locking && !self.skip_deferred { + let connection_pool = self.connection_pool.clone(); + quickwit_common::spawn_named_task( + async move { + Self::run_deferred(&connection_pool, get_deferred_migrations()).await; + }, + "postgres_deferred_migrations", + ); + } + Ok(()) + } + + /// Required migrations are lightweight and critical to the system, and must be applied. + async fn run_required(&self, mut migrator: Migrator) -> MetastoreResult<()> { + let mut conn = self.connection_pool.acquire().await?; + + if self.skip_locking { + migrator.set_locking(false); + } + if self.skip_migrations { + return check_migrations(&migrator, &mut conn).await; + } + run_migrations(migrator, &mut conn).await } - if !skip_migrations { - // this is an hidden function, made to get "around the annoying "implementation of `Acquire` - // is not general enough" error", which is the error we get otherwise. - let migrate_result = migrator.run_direct(conn).await; + /// Apply the deferred migrations. We try to get the lock; if we do, we're the migration leader + /// and attempt to apply it. If we didn't, that means another pod did, and we can go about our + /// day. + async fn run_deferred(pool: &TrackedPool, mut migrator: Migrator) -> DeferredOutcome { + let mut conn: PgConnection = match pool.acquire().await { + // Postgres will automatically unlock the advisory lock on a session end/conn drop. + // Detaching the connection scopes it to this one session, which guarantees Postgres + // will automatically unlock for us, including on failure/panic. + Ok(connection) => connection.detach(), + Err(error) => { + error!(%error, "failed to acquire connection for deferred PostgreSQL migrations"); + return DeferredOutcome::Failure; + } + }; - let Err(migrate_error) = migrate_result else { - tx.commit().await?; - return Ok(()); + // This query returns either true or false: we got the lock, or we didn't. It's dropped when + // the connection is dropped. + let acquired: bool = match sqlx::query_scalar("SELECT pg_try_advisory_lock($1, $2)") + .bind(DEFERRED_MIGRATIONS_LOCK_KEY_1) + .bind(DEFERRED_MIGRATIONS_LOCK_KEY_2) + .fetch_one(&mut conn) + .await + { + Ok(acquired) => acquired, + Err(error) => { + error!(%error, "failed to query deferred-migration advisory lock"); + return DeferredOutcome::Failure; + } }; - tx.rollback().await?; - error!(error=%migrate_error, "failed to run PostgreSQL migrations"); + if !acquired { + info!("deferred PostgreSQL migrations handled by another node"); + return DeferredOutcome::LockNotAcquired; + } - Err(MetastoreError::Internal { - message: "failed to run PostgreSQL migrations".to_string(), - cause: migrate_error.to_string(), - }) - } else { - check_migrations(migrator, conn).await + // We've taken out our own lock; we don't need sqlx to also get one + migrator.set_locking(false); + match run_migrations(migrator, &mut conn).await { + Ok(()) => { + info!("deferred PostgreSQL migrations applied successfully"); + counter!(parent: DEFERRED_MIGRATIONS_APPLY, labels: [labels!("result" => "success")]) + .inc(); + DeferredOutcome::Success + } + Err(error) => { + error!(%error, "failed to apply deferred migrations"); + counter!(parent: DEFERRED_MIGRATIONS_APPLY, labels: [labels!("result" => "failure")]) + .inc(); + DeferredOutcome::Failure + } + } } } -async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { - let dirty = match conn.dirty_version().await { - Ok(dirty) => dirty, - Err(migrate_error) => { - error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); +fn internal_error(message: &'static str, cause: String) -> MetastoreError { + MetastoreError::Internal { + message: message.to_string(), + cause, + } +} - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: migrate_error.to_string(), - }); - } - }; +/// Runs a migrator's pending up-migrations on `conn`. +/// `ignore_missing` is how we are able to ensure forwards/backwards compatibility - we don't error +/// in the case a migration in the db is not present in our migration set. +async fn run_migrations(mut migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { + migrator.set_ignore_missing(true); + // run_direct takes a Migrate connection directly, sidestepping run()'s Acquire bound that our + // pooled connection type doesn't satisfy. + migrator.run_direct(conn).await.map_err(|migrate_error| { + error!(error=%migrate_error, "failed to run PostgreSQL migrations"); + internal_error( + "failed to run PostgreSQL migrations", + migrate_error.to_string(), + ) + }) +} + +async fn check_migrations(migrator: &Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { + let dirty = conn.dirty_version().await.map_err(|migrate_error| { + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); + internal_error( + "failed to validate PostgreSQL migrations", + migrate_error.to_string(), + ) + })?; if let Some(dirty) = dirty { error!("migration {dirty} is dirty"); - - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: format!("migration {dirty} is dirty"), - }); - }; - let applied_migrations = match conn.list_applied_migrations().await { - Ok(applied_migrations) => applied_migrations, - Err(migrate_error) => { + return Err(internal_error( + "failed to validate PostgreSQL migrations", + format!("migration {dirty} is dirty"), + )); + } + let applied_migrations = conn + .list_applied_migrations() + .await + .map_err(|migrate_error| { error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); - - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: migrate_error.to_string(), - }); - } - }; - let expected_migrations: BTreeMap<_, _> = migrator + internal_error( + "failed to validate PostgreSQL migrations", + migrate_error.to_string(), + ) + })?; + let applied_by_version: BTreeMap<_, _> = applied_migrations .iter() - .filter(|migration| migration.migration_type.is_up_migration()) - .map(|migration| (migration.version, migration)) + .map(|applied_migration| (applied_migration.version, applied_migration)) .collect(); - if applied_migrations.len() < expected_migrations.len() { - error!( - "missing migrations, expected {} migrations, only {} present in database", - expected_migrations.len(), - applied_migrations.len() - ); - - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: format!( - "missing migrations, expected {} migrations, only {} present in database", - expected_migrations.len(), - applied_migrations.len() - ), - }); - } - for applied_migration in applied_migrations { - let Some(migration) = expected_migrations.get(&applied_migration.version) else { - error!( - "found unknown migration {} in database", - applied_migration.version - ); - - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: format!( - "found unknown migration {} in database", - applied_migration.version - ), - }); + for expected_migration in migrator + .iter() + .filter(|migration| migration.migration_type.is_up_migration()) + { + let Some(applied_migration) = applied_by_version.get(&expected_migration.version) else { + error!("missing required migration {}", expected_migration.version); + return Err(internal_error( + "failed to validate PostgreSQL migrations", + format!("missing required migration {}", expected_migration.version), + )); }; - if migration.checksum != applied_migration.checksum { + if expected_migration.checksum != applied_migration.checksum { error!( - "migration {} differ between database and expected value", - applied_migration.version + "migration {} differs between database and expected value", + expected_migration.version ); - - return Err(MetastoreError::Internal { - message: "failed to validate PostgreSQL migrations".to_string(), - cause: format!( - "migration {} differ between database and expected value", - applied_migration.version + return Err(internal_error( + "failed to validate PostgreSQL migrations", + format!( + "migration {} differs between database and expected value", + expected_migration.version ), - }); + )); } } Ok(()) @@ -153,78 +236,267 @@ mod tests { use std::time::Duration; use quickwit_common::uri::Uri; - use sqlx::Acquire; - use sqlx::migrate::Migrate; + use sqlx::migrate::{Migrate, Migrator}; + use sqlx::{Acquire, Postgres}; - use super::{get_migrations, run_migrations}; + use super::{DeferredOutcome, Migrations, TrackedPool}; use crate::metastore::postgres::utils::establish_connection; - #[tokio::test] - #[serial_test::file_serial] - async fn test_metastore_check_migration() { - let _ = tracing_subscriber::fmt::try_init(); - + fn test_uri() -> Uri { dotenvy::dotenv().ok(); - let uri: Uri = std::env::var("QW_TEST_DATABASE_URL") + std::env::var("QW_TEST_DATABASE_URL") .expect("environment variable `QW_TEST_DATABASE_URL` should be set") .parse() - .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI"); + .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI") + } - { - let connection_pool = - establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, false) - .await - .unwrap(); - // make sure migrations are run - run_migrations(&connection_pool, false, false) - .await - .unwrap(); + async fn test_pool(read_only: bool) -> TrackedPool { + establish_connection( + &test_uri(), + 1, + 5, + Duration::from_secs(2), + None, + None, + read_only, + ) + .await + .unwrap() + } + + fn migrations(connection_pool: &TrackedPool, skip_migrations: bool) -> Migrations { + Migrations { + connection_pool: connection_pool.clone(), + skip_migrations, + skip_locking: false, + skip_deferred: false, + } + } - // we just ran migration, nothing else to run - run_migrations(&connection_pool, true, false).await.unwrap(); - - let migrations = get_migrations(); - let last_migration = migrations - .iter() - .map(|migration| migration.version) - .max() - .expect("no migration exists?"); - let up_migration = migrations - .iter() - .find(|migration| { - migration.version == last_migration - && migration.migration_type.is_up_migration() - }) + // A single controlled migration so these tests don't depend on the production migration set. + // Migrator::new loads the SQL into memory, so the tempdir can be dropped afterward. + async fn test_migrator() -> Migrator { + let migrations_dir = tempfile::tempdir().unwrap(); + std::fs::write( + migrations_dir.path().join("90000_test.up.sql"), + "CREATE TABLE IF NOT EXISTS qw_test_migration_marker (id INT)", + ) + .unwrap(); + std::fs::write( + migrations_dir.path().join("90000_test.down.sql"), + "DROP TABLE IF EXISTS qw_test_migration_marker", + ) + .unwrap(); + Migrator::new(migrations_dir.path()).await.unwrap() + } + + // Removes a bookkeeping row, tolerating a fresh database where the table doesn't exist yet. + async fn delete_migration_row(connection_pool: &TrackedPool, version: i64) { + let migrations_table_exists: bool = + sqlx::query_scalar("SELECT to_regclass('_sqlx_migrations') IS NOT NULL") + .fetch_one(connection_pool) + .await .unwrap(); - let down_migration = migrations - .iter() - .find(|migration| { - migration.version == last_migration - && migration.migration_type.is_down_migration() - }) + if migrations_table_exists { + sqlx::query("DELETE FROM _sqlx_migrations WHERE version = $1") + .bind(version) + .execute(connection_pool) + .await .unwrap(); - let mut conn = connection_pool.acquire().await.unwrap(); + } + } - conn.revert(down_migration).await.unwrap(); + async fn reset_test_migration(connection_pool: &TrackedPool) { + sqlx::query("DROP TABLE IF EXISTS qw_test_migration_marker") + .execute(connection_pool) + .await + .unwrap(); + delete_migration_row(connection_pool, 90000).await; + } - run_migrations(&connection_pool, true, false) - .await - .unwrap_err(); + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_applies_migrations() { + let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(false).await; + reset_test_migration(&pool).await; - conn.apply(up_migration).await.unwrap(); - } + migrations(&pool, false) + .run_required(test_migrator().await) + .await + .unwrap(); - { - let connection_pool = - establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, true) - .await - .unwrap(); - // error because we are in read only mode, and we try to run migrations - run_migrations(&connection_pool, false, false) + // the migration actually ran: its marker table now exists (we dropped it above, so this + // proves run_required recreated it) + sqlx::query("SELECT 1 FROM qw_test_migration_marker LIMIT 1") + .fetch_optional(&pool) + .await + .unwrap(); + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_validates_applied_migrations() { + let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(false).await; + reset_test_migration(&pool).await; + migrations(&pool, false) + .run_required(test_migrator().await) + .await + .unwrap(); + // nothing left to run; validation passes + migrations(&pool, true) + .run_required(test_migrator().await) + .await + .unwrap(); + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_tolerates_unknown_migration() { + let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(false).await; + reset_test_migration(&pool).await; + migrations(&pool, false) + .run_required(test_migrator().await) + .await + .unwrap(); + + // an unknown high-version row (a deferred migration, or a newer rolled-back image) is + // tolerated: the required-track check only asserts required migrations are present + sqlx::query( + "INSERT INTO _sqlx_migrations (version, description, success, checksum, \ + execution_time) VALUES (999999, 'tolerance test row', true, '\\x00'::bytea, 0)", + ) + .execute(&pool) + .await + .unwrap(); + let result = migrations(&pool, true) + .run_required(test_migrator().await) + .await; + sqlx::query("DELETE FROM _sqlx_migrations WHERE version = 999999") + .execute(&pool) + .await + .unwrap(); + result.unwrap(); + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_fails_on_missing_required_migration() { + let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(false).await; + reset_test_migration(&pool).await; + migrations(&pool, false) + .run_required(test_migrator().await) + .await + .unwrap(); + + // revert the only migration so validation finds it missing + let migrator = test_migrator().await; + let down_migration = migrator + .iter() + .find(|migration| migration.migration_type.is_down_migration()) + .unwrap(); + let mut conn = pool.acquire().await.unwrap(); + conn.revert(down_migration).await.unwrap(); + + let result = migrations(&pool, true) + .run_required(test_migrator().await) + .await; + reset_test_migration(&pool).await; + result.unwrap_err(); + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_apply_fails_when_read_only() { + let _ = tracing_subscriber::fmt::try_init(); + let pool = test_pool(true).await; + // writing migrations fails in read-only mode + migrations(&pool, false) + .run_required(test_migrator().await) + .await + .unwrap_err(); + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_run_required_validates_when_read_only() { + let _ = tracing_subscriber::fmt::try_init(); + let writable_pool = test_pool(false).await; + reset_test_migration(&writable_pool).await; + // apply the test migration first, using a writable connection + migrations(&writable_pool, false) + .run_required(test_migrator().await) + .await + .unwrap(); + // validation only reads, so it succeeds even when read-only + migrations(&test_pool(true).await, true) + .run_required(test_migrator().await) + .await + .unwrap(); + reset_test_migration(&writable_pool).await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_deferred_migrations_elect_single_runner() { + let _ = tracing_subscriber::fmt::try_init(); + let connection_pool = + establish_connection(&test_uri(), 2, 5, Duration::from_secs(5), None, None, false) .await - .unwrap_err(); - // okay because all migrations were already run before - run_migrations(&connection_pool, true, false).await.unwrap(); - } + .unwrap(); + + // in case a previous attempt left state behind + sqlx::query("DROP TABLE IF EXISTS qw_test_deferred_marker") + .execute(&connection_pool) + .await + .unwrap(); + delete_migration_row(&connection_pool, 90001).await; + + // the migration creates a marker table (to prove it ran) and sleeps (to keep the lock + // winner busy while the other node tries to acquire the lock) + let migrations_dir = tempfile::tempdir().unwrap(); + std::fs::write( + migrations_dir.path().join("90001_slow_marker.up.sql"), + "CREATE TABLE IF NOT EXISTS qw_test_deferred_marker (id INT); SELECT pg_sleep(1)", + ) + .unwrap(); + + let (result_1, result_2) = tokio::join!( + Migrations::run_deferred( + &connection_pool, + Migrator::new(migrations_dir.path()).await.unwrap() + ), + Migrations::run_deferred( + &connection_pool, + Migrator::new(migrations_dir.path()).await.unwrap() + ), + ); + + let outcomes = [result_1, result_2]; + let applied = outcomes + .iter() + .filter(|outcome| matches!(outcome, DeferredOutcome::Success)) + .count(); + let not_acquired = outcomes + .iter() + .filter(|outcome| matches!(outcome, DeferredOutcome::LockNotAcquired)) + .count(); + assert_eq!( + applied, 1, + "exactly one runner should win the lock and apply" + ); + assert_eq!( + not_acquired, 1, + "the other runner should not acquire the lock" + ); + + // the winning runner actually applied the migration: its marker table now exists + sqlx::query("SELECT 1 FROM qw_test_deferred_marker LIMIT 1") + .fetch_optional(&connection_pool) + .await + .unwrap(); } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs index a1c99997f3a..d05080174c5 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs @@ -29,4 +29,5 @@ pub use metastore::PostgresqlMetastore; const QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY: &str = "QW_POSTGRES_SKIP_MIGRATIONS"; const QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY: &str = "QW_POSTGRES_SKIP_MIGRATION_LOCKING"; +const QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS_ENV_KEY: &str = "QW_POSTGRES_SKIP_DEFERRED_MIGRATIONS"; const QW_POSTGRES_READ_ONLY_ENV_KEY: &str = "QW_POSTGRES_READ_ONLY";