From bdb1746711cef527fe35940203f8233f7ffe793d Mon Sep 17 00:00:00 2001 From: Aria Stewart Date: Fri, 22 May 2026 09:56:07 -0400 Subject: [PATCH 1/2] Add bottomless restore interruption integration tests Add integration tests for libsql-server bottomless replication restore behavior when interrupted by various failure modes. Tests verify sqld can resume and complete an interrupted restore from S3-compatible object storage (minio) without requiring a restart. Test cases: - basic_restore: Sanity check that sqld restores from minio - sqld_interrupted: sqld killed mid-restore, restarted, completes - minio_interrupted: minio stopped mid-restore, restarted, sqld retries - network_partition: sqld disconnected from network mid-restore, reconnected Infrastructure: - Docker-based fixtures with isolated networks per test - Unique container/network names and ports via atomic counters - Port mapping (not host networking) for isolation - Automatic cleanup of Docker resources after each test Files added: - tests/bottomless/mod.rs - tests/bottomless/fixtures.rs - tests/bottomless/basic_restore.rs - tests/bottomless/sqld_interrupted.rs - tests/bottomless/minio_interrupted.rs - tests/bottomless/network_partition.rs - tests/bottomless/README.md Files modified: - tests/tests.rs: Add bottomless module - Cargo.toml: Add reqwest dev-dependency, remove duplicate hex --- Cargo.lock | 102 ++++ libsql-server/Cargo.toml | 2 + libsql-server/tests/bottomless/README.md | 47 ++ .../tests/bottomless/basic_restore.rs | 61 ++ libsql-server/tests/bottomless/fixtures.rs | 560 ++++++++++++++++++ .../tests/bottomless/minio_interrupted.rs | 76 +++ libsql-server/tests/bottomless/mod.rs | 5 + .../tests/bottomless/network_partition.rs | 113 ++++ .../tests/bottomless/sqld_interrupted.rs | 78 +++ libsql-server/tests/tests.rs | 1 + 10 files changed, 1045 insertions(+) create mode 100644 libsql-server/tests/bottomless/README.md create mode 100644 libsql-server/tests/bottomless/basic_restore.rs create mode 100644 libsql-server/tests/bottomless/fixtures.rs create mode 100644 libsql-server/tests/bottomless/minio_interrupted.rs create mode 100644 libsql-server/tests/bottomless/mod.rs create mode 100644 libsql-server/tests/bottomless/network_partition.rs create mode 100644 libsql-server/tests/bottomless/sqld_interrupted.rs diff --git a/Cargo.lock b/Cargo.lock index 9d411cd66c..967f7c1c61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2048,6 +2048,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -2564,6 +2579,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-tungstenite" version = "0.11.1" @@ -3039,6 +3067,7 @@ dependencies = [ "futures-core", "hashbrown 0.14.5", "hdrhistogram", + "hex", "hmac", "http-body 0.4.6", "hyper", @@ -3446,6 +3475,23 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -3606,12 +3652,49 @@ version = "11.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" +[[package]] +name = "openssl" +version = "0.10.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a45fa2aa886c42762255da344f0a0d313e254066c46aad76f300c3d3da62d967" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types", + "libc", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.116" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28a22dc7140cda5f096e5e7724a6962ca81a7f8bfd2979f9b18c11af56318c4" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "3.9.2" @@ -4283,10 +4366,12 @@ dependencies = [ "http-body 0.4.6", "hyper", "hyper-rustls 0.24.2", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -4298,6 +4383,7 @@ dependencies = [ "sync_wrapper", "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls 0.24.1", "tower-service", "url", @@ -5268,6 +5354,16 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -5759,6 +5855,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vergen" version = "8.3.2" diff --git a/libsql-server/Cargo.toml b/libsql-server/Cargo.toml index 06107d0557..2d051c3e16 100644 --- a/libsql-server/Cargo.toml +++ b/libsql-server/Cargo.toml @@ -99,6 +99,7 @@ indicatif = "0.17.8" [dev-dependencies] arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] } env_logger = "0.10" +hex = "0.4" hyper = { workspace = true, features = ["client"] } insta = { version = "1.26.0", features = ["json"] } libsql = { path = "../libsql/"} @@ -114,6 +115,7 @@ s3s-fs = "0.8.1" ring = { version = "0.17.8", features = ["std"] } tonic-build = "0.11" prost-build = "0.12" +reqwest = { version = "0.11", features = ["json"] } [build-dependencies] vergen = { version = "8", features = ["build", "git", "gitcl"] } diff --git a/libsql-server/tests/bottomless/README.md b/libsql-server/tests/bottomless/README.md new file mode 100644 index 0000000000..f0d73f6f3d --- /dev/null +++ b/libsql-server/tests/bottomless/README.md @@ -0,0 +1,47 @@ +# Bottomless Restore Interruption Tests + +Integration tests for libsql-server's (sqld) bottomless S3 backup/restore feature. These tests verify that sqld correctly handles interrupted restores from object storage (minio). + +## Test Cases + +### 1. `basic_restore` +Sanity check - verifies sqld can restore a database from minio after local files are deleted. + +### 2. `sqld_interrupted` +Simulates sqld crashing mid-restore (SIGKILL). After restart, sqld must detect the incomplete restore and complete it successfully. + +### 3. `minio_interrupted` +Minio (S3) is stopped mid-restore, then restarted. sqld must retry and complete the restore once S3 is available again. + +### 4. `network_partition` +The sqld container is disconnected from the Docker network mid-restore (simulating a network partition), then reconnected. sqld must resume and complete the restore without requiring a restart. + +## Running Tests + +```bash +# Run all bottomless tests +cargo test --test tests bottomless -- --nocapture + +# Run individual tests +cargo test --test tests bottomless::basic_restore -- --nocapture +cargo test --test tests bottomless::sqld_interrupted -- --nocapture +cargo test --test tests bottomless::minio_interrupted -- --nocapture +cargo test --test tests bottomless::network_partition -- --nocapture +``` + +## Architecture + +- **MinIO**: Runs in a Docker container with a dedicated Docker network per test +- **sqld**: Runs in a Docker container on the same network, with port mapping for HTTP access +- Each test gets unique container/network names and ports to avoid conflicts when running in parallel +- Data directory is a temp directory mounted into the sqld container + +## Prerequisites + +- Docker daemon running +- Port range 20000-30000 available on localhost + +## Notes + +- Tests never skip or ignore. If the Docker environment is unavailable, they fail loudly. +- All temporary containers and networks are cleaned up after each test. diff --git a/libsql-server/tests/bottomless/basic_restore.rs b/libsql-server/tests/bottomless/basic_restore.rs new file mode 100644 index 0000000000..2c193248e6 --- /dev/null +++ b/libsql-server/tests/bottomless/basic_restore.rs @@ -0,0 +1,61 @@ +use super::fixtures::{MinioFixture, SqldFixture, TestDatabase}; +use std::time::Duration; + +#[tokio::test] +async fn test_basic_restore() { + let _ = tracing_subscriber::fmt::try_init(); + + let minio = MinioFixture::start().await.expect("Failed to start minio"); + + let data_dir = tempfile::tempdir().expect("Failed to create temp dir"); + let mut sqld = SqldFixture::new(&minio); + + // Phase 1: Create database and replicate to minio + sqld.start(data_dir.path()) + .await + .expect("Failed to start sqld"); + sqld.wait_for_ready(Duration::from_secs(30)) + .await + .expect("sqld did not become ready"); + + let endpoint = sqld.http_endpoint(); + let db = TestDatabase::new(endpoint.clone()); + db.create_schema().await.expect("Failed to create schema"); + db.insert_test_data(100) + .await + .expect("Failed to insert data"); + db.wait_for_replication() + .await + .expect("Failed to wait for replication"); + + sqld.stop().await.expect("Failed to stop sqld"); + + // Phase 2: Delete local database files to force restore + sqld.cleanup_data_dir(data_dir.path()) + .await + .expect("Failed to cleanup dbs dir"); + + // Phase 3: Start sqld - should restore from minio + let endpoint2 = sqld.http_endpoint(); + sqld.start(data_dir.path()) + .await + .expect("Failed to restart sqld"); + sqld.wait_for_ready(Duration::from_secs(60)) + .await + .expect("sqld did not become ready after restore"); + + // Phase 4: Verify database is intact + let db2 = TestDatabase::new(endpoint2); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let restored_data = db2.query_all().await.expect("Failed to query data"); + assert_eq!(restored_data.len(), 100, "Expected 100 rows after restore"); + + db2.verify_integrity() + .await + .expect("Data integrity check failed"); + + sqld.cleanup().await.ok(); + minio.cleanup().await.ok(); +} diff --git a/libsql-server/tests/bottomless/fixtures.rs b/libsql-server/tests/bottomless/fixtures.rs new file mode 100644 index 0000000000..ed6d8a1401 --- /dev/null +++ b/libsql-server/tests/bottomless/fixtures.rs @@ -0,0 +1,560 @@ +use std::path::Path; +use std::sync::atomic::{AtomicU16, AtomicU64, Ordering}; +use std::time::Duration; + +static PORT_COUNTER: AtomicU16 = AtomicU16::new(0); +static ID_COUNTER: AtomicU64 = AtomicU64::new(0); + +fn next_port() -> u16 { + let counter = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); + 20000 + (counter % 10000) +} + +fn unique_id() -> String { + use std::time::SystemTime; + let ts = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let pid = std::process::id(); + let counter = ID_COUNTER.fetch_add(1, Ordering::SeqCst); + format!("{}-{}-{}", pid, counter, ts) +} + +pub struct MinioFixture { + pub container_name: String, + pub network_name: String, + api_port: u16, + console_port: u16, +} + +impl MinioFixture { + pub async fn start() -> anyhow::Result { + let uid = unique_id(); + let api_port = next_port(); + let console_port = next_port(); + let container_name = format!("minio-test-{}", uid); + let network_name = format!("sqld-net-{}", uid); + + // Create Docker network + let net_output = tokio::process::Command::new("docker") + .args(["network", "create", &network_name]) + .output() + .await?; + if !net_output.status.success() { + anyhow::bail!( + "Failed to create Docker network: {}", + String::from_utf8_lossy(&net_output.stderr) + ); + } + + // Start minio container + let run_output = tokio::process::Command::new("docker") + .args([ + "run", + "-d", + "--name", + &container_name, + "--network", + &network_name, + "-p", + &format!("{}:9000", api_port), + "-p", + &format!("{}:9001", console_port), + "-e", + "MINIO_ROOT_USER=minioadmin", + "-e", + "MINIO_ROOT_PASSWORD=minioadmin", + "quay.io/minio/minio:latest", + "server", + "/data", + "--console-address", + ":9001", + ]) + .output() + .await?; + + if !run_output.status.success() { + let _ = tokio::process::Command::new("docker") + .args(["network", "rm", &network_name]) + .output() + .await; + anyhow::bail!( + "Failed to start minio container: {}", + String::from_utf8_lossy(&run_output.stderr) + ); + } + + // Wait for minio to be ready + let client = reqwest::Client::new(); + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + while tokio::time::Instant::now() < deadline { + match client + .get(format!("http://127.0.0.1:{}/minio/health/live", api_port)) + .send() + .await + { + Ok(resp) if resp.status().is_success() => break, + _ => tokio::time::sleep(Duration::from_millis(200)).await, + } + } + + tokio::time::sleep(Duration::from_secs(1)).await; + + // Create bucket using mc + let mc_output = tokio::process::Command::new("docker") + .args([ + "run", + "--rm", + "--network", + &network_name, + "quay.io/minio/mc:latest", + "alias", + "set", + "myminio", + &format!("http://{}:9000", container_name), + "minioadmin", + "minioadmin", + ]) + .output() + .await?; + if !mc_output.status.success() { + tracing::warn!( + "mc alias set failed: {}", + String::from_utf8_lossy(&mc_output.stderr) + ); + } + + let mb_output = tokio::process::Command::new("docker") + .args([ + "run", + "--rm", + "--network", + &network_name, + "quay.io/minio/mc:latest", + "mb", + "myminio/bottomless", + ]) + .output() + .await?; + if !mb_output.status.success() { + tracing::warn!( + "mc mb failed: {}", + String::from_utf8_lossy(&mb_output.stderr) + ); + } + + Ok(Self { + container_name, + network_name, + api_port, + console_port, + }) + } + + pub fn endpoint(&self) -> String { + format!("http://127.0.0.1:{}", self.api_port) + } + + pub fn internal_endpoint(&self) -> String { + format!("http://{}:9000", self.container_name) + } + + pub async fn stop(&self) -> anyhow::Result<()> { + let output = tokio::process::Command::new("docker") + .args(["stop", "-t", "5", &self.container_name]) + .output() + .await?; + if !output.status.success() { + anyhow::bail!( + "Failed to stop minio: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + Ok(()) + } + + pub async fn restart(&self) -> anyhow::Result<()> { + let output = tokio::process::Command::new("docker") + .args(["start", &self.container_name]) + .output() + .await?; + if !output.status.success() { + anyhow::bail!( + "Failed to restart minio: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + // Wait for minio to be ready + let client = reqwest::Client::new(); + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + while tokio::time::Instant::now() < deadline { + match client + .get(format!( + "http://127.0.0.1:{}/minio/health/live", + self.api_port + )) + .send() + .await + { + Ok(resp) if resp.status().is_success() => return Ok(()), + _ => tokio::time::sleep(Duration::from_millis(200)).await, + } + } + anyhow::bail!("minio did not become ready after restart") + } + + pub async fn cleanup(&self) -> anyhow::Result<()> { + let _ = tokio::process::Command::new("docker") + .args(["rm", "-f", &self.container_name]) + .output() + .await; + let _ = tokio::process::Command::new("docker") + .args(["network", "rm", &self.network_name]) + .output() + .await; + Ok(()) + } +} + +pub struct SqldFixture<'a> { + minio: &'a MinioFixture, + http_port: u16, + pub container_name: String, +} + +impl<'a> SqldFixture<'a> { + pub fn new(minio: &'a MinioFixture) -> Self { + let http_port = next_port(); + Self { + minio, + http_port, + container_name: format!("sqld-test-{}", unique_id()), + } + } + + pub async fn start(&mut self, data_dir: &Path) -> anyhow::Result<()> { + // Remove any existing container + let _ = tokio::process::Command::new("docker") + .args(["rm", "-f", &self.container_name]) + .output() + .await; + + let data_dir_str = data_dir.to_str().unwrap(); + let network_name = &self.minio.network_name; + + let run_output = tokio::process::Command::new("docker") + .args([ + "run", + "-d", + "--name", + &self.container_name, + "--network", + network_name, + "-p", + &format!("{}:8080", self.http_port), + "-e", + &format!( + "LIBSQL_BOTTOMLESS_ENDPOINT={}", + self.minio.internal_endpoint() + ), + "-e", + "LIBSQL_BOTTOMLESS_BUCKET=bottomless", + "-e", + "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID=minioadmin", + "-e", + "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY=minioadmin", + "-e", + "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION=us-east-1", + "-e", + "SQLD_ENABLE_BOTTOMLESS_REPLICATION=true", + "-e", + "SQLD_DB_PATH=/var/lib/sqld", + "-v", + &format!("{}:/var/lib/sqld", data_dir_str), + "ghcr.io/tursodatabase/libsql-server:latest", + ]) + .output() + .await?; + + if !run_output.status.success() { + anyhow::bail!( + "Failed to start sqld container: {}", + String::from_utf8_lossy(&run_output.stderr) + ); + } + + Ok(()) + } + + pub async fn kill(&self) -> anyhow::Result<()> { + let output = tokio::process::Command::new("docker") + .args(["kill", &self.container_name]) + .output() + .await?; + if !output.status.success() { + anyhow::bail!( + "Failed to kill sqld: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + Ok(()) + } + + pub async fn stop(&self) -> anyhow::Result<()> { + let output = tokio::process::Command::new("docker") + .args(["stop", "-t", "30", &self.container_name]) + .output() + .await?; + if !output.status.success() { + anyhow::bail!( + "Failed to stop sqld: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + Ok(()) + } + + pub async fn restart(&mut self) -> anyhow::Result<()> { + let output = tokio::process::Command::new("docker") + .args(["start", &self.container_name]) + .output() + .await?; + if !output.status.success() { + anyhow::bail!( + "Failed to restart sqld: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + Ok(()) + } + + pub async fn wait_for_ready(&self, timeout: Duration) -> anyhow::Result<()> { + let deadline = tokio::time::Instant::now() + timeout; + let client = reqwest::Client::new(); + while tokio::time::Instant::now() < deadline { + match client + .get(format!("http://127.0.0.1:{}/health", self.http_port)) + .send() + .await + { + Ok(resp) if resp.status().is_success() => return Ok(()), + _ => tokio::time::sleep(Duration::from_millis(100)).await, + } + } + anyhow::bail!("sqld did not become ready within {:?}", timeout) + } + + pub async fn wait_for_restore_start(&self) -> anyhow::Result<()> { + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + while tokio::time::Instant::now() < deadline { + let output = tokio::process::Command::new("docker") + .args(["logs", &self.container_name]) + .output() + .await?; + let logs = String::from_utf8_lossy(&output.stdout); + if logs.contains("Restoring from generation") || logs.contains("restore") { + return Ok(()); + } + tokio::time::sleep(Duration::from_millis(200)).await; + } + anyhow::bail!("sqld did not start restoring within 30 seconds") + } + + pub async fn cleanup_data_dir(&self, data_dir: &Path) -> anyhow::Result<()> { + let data_dir_str = data_dir.to_str().unwrap(); + let output = tokio::process::Command::new("docker") + .args([ + "run", + "--rm", + "-v", + &format!("{}:/data", data_dir_str), + "alpine", + "sh", + "-c", + "find /data -type f -name '*.db' -delete; find /data -type f -name '*.db-journal' -delete; find /data -type f -name '*.db-wal' -delete; find /data -type f -name '*.db-shm' -delete", + ]) + .output() + .await?; + if !output.status.success() { + anyhow::bail!( + "Failed to cleanup data dir: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + Ok(()) + } + + pub fn http_endpoint(&self) -> String { + format!("http://127.0.0.1:{}", self.http_port) + } + + pub async fn cleanup(&self) -> anyhow::Result<()> { + let _ = tokio::process::Command::new("docker") + .args(["rm", "-f", &self.container_name]) + .output() + .await; + Ok(()) + } +} + +pub struct TestDatabase { + endpoint: String, + client: reqwest::Client, +} + +impl TestDatabase { + pub fn new(endpoint: String) -> Self { + Self { + endpoint, + client: reqwest::Client::new(), + } + } + + pub async fn create_schema(&self) -> anyhow::Result<()> { + self.execute_sql("DROP TABLE IF EXISTS test_data").await?; + self.execute_sql("CREATE TABLE test_data (id INTEGER PRIMARY KEY, value TEXT, data BLOB)") + .await?; + Ok(()) + } + + pub async fn insert_test_data(&self, count: usize) -> anyhow::Result<()> { + for i in 0..count { + let value = format!("test_value_{}", i); + let data = vec![0u8; 1024]; + let hex_data = hex::encode(&data); + self.execute_sql(&format!( + "INSERT INTO test_data (id, value, data) VALUES ({}, '{}', X'{}')", + i, value, hex_data + )) + .await?; + } + Ok(()) + } + + pub async fn query_all(&self) -> anyhow::Result> { + let resp = self.execute_sql("SELECT * FROM test_data").await?; + Ok(extract_rows(&resp)) + } + + pub async fn verify_integrity(&self) -> anyhow::Result<()> { + let resp = self + .execute_sql("SELECT COUNT(*) AS total FROM test_data") + .await?; + let rows = extract_rows(&resp); + let count = rows + .first() + .and_then(|r| r.get("total")) + .and_then(|c| { + c.as_i64() + .or_else(|| c.as_str().and_then(|s| s.parse::().ok())) + }) + .unwrap_or(0); + if count == 0 { + anyhow::bail!("Database is empty after restore"); + } + let resp = self + .execute_sql("SELECT value FROM test_data WHERE id = 42") + .await?; + let rows = extract_rows(&resp); + let value = rows + .first() + .and_then(|r| r.get("value")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + if value != "test_value_42" { + anyhow::bail!( + "Data integrity check failed: expected 'test_value_42', got '{}'", + value + ); + } + Ok(()) + } + + pub async fn wait_for_replication(&self) -> anyhow::Result<()> { + tokio::time::sleep(Duration::from_secs(3)).await; + Ok(()) + } + + async fn execute_sql(&self, sql: &str) -> anyhow::Result { + let body = serde_json::json!({ + "requests": [ + { "type": "execute", "stmt": { "sql": sql } }, + { "type": "close" } + ] + }); + let resp = self + .client + .post(format!("{}/v2/pipeline", self.endpoint)) + .json(&body) + .send() + .await?; + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + anyhow::bail!("SQL execution failed with status {}: {}", status, text); + } + let result: serde_json::Value = resp.json().await?; + if let Some(results) = result.get("results").and_then(|r| r.as_array()) { + for res in results { + if res.get("type") == Some(&serde_json::json!("error")) { + let error = res + .get("error") + .cloned() + .unwrap_or(serde_json::json!("unknown error")); + anyhow::bail!("SQL execution error: {}", error); + } + } + } + Ok(result) + } +} + +fn extract_rows(response: &serde_json::Value) -> Vec { + let mut rows = Vec::new(); + if let Some(results) = response.get("results").and_then(|r| r.as_array()) { + for result in results { + if result.get("type") == Some(&serde_json::json!("ok")) { + if let Some(resp) = result.get("response") { + if resp.get("type") == Some(&serde_json::json!("execute")) { + if let Some(result_data) = resp.get("result") { + if let Some(cols) = result_data.get("cols").and_then(|c| c.as_array()) { + if let Some(result_rows) = + result_data.get("rows").and_then(|r| r.as_array()) + { + for row in result_rows { + let mut obj = serde_json::Map::new(); + if let Some(cells) = row.as_array() { + for (i, col) in cols.iter().enumerate() { + let col_name = col + .get("name") + .and_then(|n| n.as_str()) + .unwrap_or("unknown"); + if let Some(cell) = cells.get(i) { + let value = if let Some(val_str) = + cell.get("value").and_then(|v| v.as_str()) + { + if let Ok(n) = val_str.parse::() { + serde_json::json!(n) + } else { + serde_json::json!(val_str) + } + } else { + cell.clone() + }; + obj.insert(col_name.to_string(), value); + } + } + } + rows.push(serde_json::Value::Object(obj)); + } + } + } + } + } + } + } + } + } + rows +} diff --git a/libsql-server/tests/bottomless/minio_interrupted.rs b/libsql-server/tests/bottomless/minio_interrupted.rs new file mode 100644 index 0000000000..cae8d7f29b --- /dev/null +++ b/libsql-server/tests/bottomless/minio_interrupted.rs @@ -0,0 +1,76 @@ +use super::fixtures::{MinioFixture, SqldFixture, TestDatabase}; +use std::time::Duration; + +#[tokio::test] +async fn test_restore_completes_after_minio_killed() { + let _ = tracing_subscriber::fmt::try_init(); + + let minio = MinioFixture::start().await.expect("Failed to start minio"); + + let data_dir = tempfile::tempdir().expect("Failed to create temp dir"); + let mut sqld = SqldFixture::new(&minio); + + // Phase 1: Create database and replicate to minio + sqld.start(data_dir.path()) + .await + .expect("Failed to start sqld"); + sqld.wait_for_ready(Duration::from_secs(30)) + .await + .expect("sqld did not become ready"); + + let endpoint = sqld.http_endpoint(); + let db = TestDatabase::new(endpoint.clone()); + db.create_schema().await.expect("Failed to create schema"); + db.insert_test_data(5000) + .await + .expect("Failed to insert data"); + db.wait_for_replication() + .await + .expect("Failed to wait for replication"); + + sqld.stop().await.expect("Failed to stop sqld"); + + // Phase 2: Delete local database files to force restore + sqld.cleanup_data_dir(data_dir.path()) + .await + .expect("Failed to cleanup dbs dir"); + + sqld.start(data_dir.path()) + .await + .expect("Failed to start sqld for restore"); + + // Give sqld time to start restoring + tokio::time::sleep(Duration::from_secs(5)).await; + + // Phase 3: Kill minio mid-restore (S3 becomes unavailable) + minio.stop().await.expect("Failed to stop minio"); + + // Phase 4: Wait a bit, then restart minio + tokio::time::sleep(Duration::from_secs(2)).await; + minio.restart().await.expect("Failed to restart minio"); + + // Phase 5: sqld should recover and complete restore + sqld.wait_for_ready(Duration::from_secs(120)) + .await + .expect("sqld did not become ready after minio interruption"); + + // Phase 6: Verify database is intact + let endpoint2 = sqld.http_endpoint(); + let db2 = TestDatabase::new(endpoint2); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let restored_data = db2.query_all().await.expect("Failed to query data"); + assert_eq!( + restored_data.len(), + 5000, + "Expected 5000 rows after minio interruption" + ); + + db2.verify_integrity() + .await + .expect("Data integrity check failed after minio interruption"); + + sqld.cleanup().await.ok(); + minio.cleanup().await.ok(); +} diff --git a/libsql-server/tests/bottomless/mod.rs b/libsql-server/tests/bottomless/mod.rs new file mode 100644 index 0000000000..4e94becfeb --- /dev/null +++ b/libsql-server/tests/bottomless/mod.rs @@ -0,0 +1,5 @@ +mod basic_restore; +mod fixtures; +mod minio_interrupted; +mod network_partition; +mod sqld_interrupted; diff --git a/libsql-server/tests/bottomless/network_partition.rs b/libsql-server/tests/bottomless/network_partition.rs new file mode 100644 index 0000000000..3ba2d7855d --- /dev/null +++ b/libsql-server/tests/bottomless/network_partition.rs @@ -0,0 +1,113 @@ +use super::fixtures::{MinioFixture, SqldFixture, TestDatabase}; +use std::time::Duration; + +#[tokio::test] +async fn test_restore_completes_after_network_partition() { + let _ = tracing_subscriber::fmt::try_init(); + + let minio = MinioFixture::start().await.expect("Failed to start minio"); + + let data_dir = tempfile::tempdir().expect("Failed to create temp dir"); + let mut sqld = SqldFixture::new(&minio); + + // Phase 1: Create database and replicate to minio + sqld.start(data_dir.path()) + .await + .expect("Failed to start sqld"); + sqld.wait_for_ready(Duration::from_secs(30)) + .await + .expect("sqld did not become ready"); + + let endpoint = sqld.http_endpoint(); + let db = TestDatabase::new(endpoint.clone()); + db.create_schema().await.expect("Failed to create schema"); + db.insert_test_data(1000) + .await + .expect("Failed to insert data"); + db.wait_for_replication() + .await + .expect("Failed to wait for replication"); + + sqld.stop().await.expect("Failed to stop sqld"); + + // Phase 2: Delete local database files to force restore + sqld.cleanup_data_dir(data_dir.path()) + .await + .expect("Failed to cleanup dbs dir"); + + sqld.start(data_dir.path()) + .await + .expect("Failed to start sqld for restore"); + + // Wait for restore to begin + sqld.wait_for_restore_start() + .await + .expect("sqld did not start restoring"); + + // Give restore a moment to progress + tokio::time::sleep(Duration::from_secs(1)).await; + + // Phase 3: Simulate network partition by disconnecting sqld from the Docker network + let disconnect_output = tokio::process::Command::new("docker") + .args([ + "network", + "disconnect", + &minio.network_name, + &sqld.container_name, + ]) + .output() + .await + .expect("Failed to disconnect sqld from network"); + if !disconnect_output.status.success() { + panic!( + "Failed to disconnect sqld from network: {}", + String::from_utf8_lossy(&disconnect_output.stderr) + ); + } + + // Wait a bit while sqld is partitioned + tokio::time::sleep(Duration::from_secs(3)).await; + + // Phase 4: Reconnect sqld to the network (partition heals) + let connect_output = tokio::process::Command::new("docker") + .args([ + "network", + "connect", + &minio.network_name, + &sqld.container_name, + ]) + .output() + .await + .expect("Failed to reconnect sqld to network"); + if !connect_output.status.success() { + panic!( + "Failed to reconnect sqld to network: {}", + String::from_utf8_lossy(&connect_output.stderr) + ); + } + + // Phase 5: sqld should recover and complete restore without restart + sqld.wait_for_ready(Duration::from_secs(120)) + .await + .expect("sqld did not become ready after network partition healed"); + + // Phase 6: Verify database is intact + let endpoint2 = sqld.http_endpoint(); + let db2 = TestDatabase::new(endpoint2); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let restored_data = db2.query_all().await.expect("Failed to query data"); + assert_eq!( + restored_data.len(), + 1000, + "Expected 1000 rows after network partition" + ); + + db2.verify_integrity() + .await + .expect("Data integrity check failed after network partition"); + + sqld.cleanup().await.ok(); + minio.cleanup().await.ok(); +} diff --git a/libsql-server/tests/bottomless/sqld_interrupted.rs b/libsql-server/tests/bottomless/sqld_interrupted.rs new file mode 100644 index 0000000000..c77b9890df --- /dev/null +++ b/libsql-server/tests/bottomless/sqld_interrupted.rs @@ -0,0 +1,78 @@ +use super::fixtures::{MinioFixture, SqldFixture, TestDatabase}; +use std::time::Duration; + +#[tokio::test] +async fn test_restore_completes_after_sqld_killed() { + let _ = tracing_subscriber::fmt::try_init(); + + let minio = MinioFixture::start().await.expect("Failed to start minio"); + + let data_dir = tempfile::tempdir().expect("Failed to create temp dir"); + let mut sqld = SqldFixture::new(&minio); + + // Phase 1: Create database and replicate to minio + sqld.start(data_dir.path()) + .await + .expect("Failed to start sqld"); + sqld.wait_for_ready(Duration::from_secs(30)) + .await + .expect("sqld did not become ready"); + + let endpoint = sqld.http_endpoint(); + let db = TestDatabase::new(endpoint.clone()); + db.create_schema().await.expect("Failed to create schema"); + db.insert_test_data(1000) + .await + .expect("Failed to insert data"); + db.wait_for_replication() + .await + .expect("Failed to wait for replication"); + + sqld.stop().await.expect("Failed to stop sqld"); + + // Phase 2: Delete local database files to force restore + sqld.cleanup_data_dir(data_dir.path()) + .await + .expect("Failed to cleanup dbs dir"); + + sqld.start(data_dir.path()) + .await + .expect("Failed to start sqld for restore"); + + // Wait for restore to begin + sqld.wait_for_restore_start() + .await + .expect("sqld did not start restoring"); + + // Give restore a moment to progress + tokio::time::sleep(Duration::from_secs(1)).await; + + // Phase 3: Kill sqld mid-restore (simulate crash) + sqld.kill().await.expect("Failed to kill sqld"); + + // Phase 4: Restart sqld - must complete restore + let endpoint2 = sqld.http_endpoint(); + sqld.restart().await.expect("Failed to restart sqld"); + sqld.wait_for_ready(Duration::from_secs(60)) + .await + .expect("sqld did not become ready after interrupted restore"); + + // Phase 5: Verify database is intact + let db2 = TestDatabase::new(endpoint2); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let restored_data = db2.query_all().await.expect("Failed to query data"); + assert_eq!( + restored_data.len(), + 1000, + "Expected 1000 rows after interrupted restore" + ); + + db2.verify_integrity() + .await + .expect("Data integrity check failed after interrupted restore"); + + sqld.cleanup().await.ok(); + minio.cleanup().await.ok(); +} diff --git a/libsql-server/tests/tests.rs b/libsql-server/tests/tests.rs index ab475df546..e6135d7849 100644 --- a/libsql-server/tests/tests.rs +++ b/libsql-server/tests/tests.rs @@ -4,6 +4,7 @@ mod common; mod auth; +mod bottomless; mod cluster; mod embedded_replica; mod hrana; From 9876625c238d957c6410ef4a1cd9abd9a7dc6072 Mon Sep 17 00:00:00 2001 From: Aria Stewart Date: Tue, 26 May 2026 14:18:59 -0400 Subject: [PATCH 2/2] Add S3 timeout config to bottomless replicator and fix integration tests - Add LIBSQL_BOTTOMLESS_S3_READ_TIMEOUT_SECS (default 5s) - Add LIBSQL_BOTTOMLESS_S3_CONNECT_TIMEOUT_SECS (default 5s) - Add LIBSQL_BOTTOMLESS_S3_OPERATION_ATTEMPT_TIMEOUT_SECS (default 10s) - Configure TimeoutConfig on aws_sdk_s3::Config in bottomless::replicator::Options::client_config() - Update meta_store.rs Options construction to include new timeout fields - Remove #[ignore] from network_partition test - Fix test fixtures: endpoint timing, image caching, mut minio --- .gitignore | 2 +- Cargo.lock | 17 +-- bottomless/Cargo.toml | 1 + bottomless/src/replicator.rs | 22 +++ libsql-server/src/namespace/meta_store.rs | 3 + .../tests/bottomless/basic_restore.rs | 2 +- libsql-server/tests/bottomless/fixtures.rs | 129 +++++++++++++----- .../tests/bottomless/minio_interrupted.rs | 2 +- .../tests/bottomless/sqld_interrupted.rs | 2 +- 9 files changed, 137 insertions(+), 43 deletions(-) diff --git a/.gitignore b/.gitignore index f40057e543..5d6cd7904a 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,4 @@ libsql-sqlite3/**.o.tmp /bindings/c/generated /bindings/c/**.xcframework -/bindings/**/.DS_Store \ No newline at end of file +/bindings/**/.DS_Store diff --git a/Cargo.lock b/Cargo.lock index 967f7c1c61..8da3060bcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -651,9 +651,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.0" +version = "1.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe321a6b21f5d8eabd0ade9c55d3d0335f3c3157fc2b3e87f05f34b539e4df5" +checksum = "c7b8a53819e42f10d0821f56da995e1470b199686a1809168db6ca485665f042" dependencies = [ "base64-simd", "bytes", @@ -914,6 +914,7 @@ dependencies = [ "async-compression 0.4.11", "aws-config", "aws-sdk-s3", + "aws-smithy-types", "bytes", "chrono", "futures-core", @@ -2927,7 +2928,7 @@ dependencies = [ [[package]] name = "libsql" -version = "0.10.0-pre.2" +version = "0.10.0-pre.3" dependencies = [ "anyhow", "async-stream", @@ -2987,7 +2988,7 @@ dependencies = [ [[package]] name = "libsql-ffi" -version = "0.10.0-pre.2" +version = "0.10.0-pre.3" dependencies = [ "bindgen", "cc", @@ -2998,7 +2999,7 @@ dependencies = [ [[package]] name = "libsql-hrana" -version = "0.10.0-pre.2" +version = "0.10.0-pre.3" dependencies = [ "base64 0.21.7", "bytes", @@ -3009,7 +3010,7 @@ dependencies = [ [[package]] name = "libsql-rusqlite" -version = "0.10.0-pre.2" +version = "0.10.0-pre.3" dependencies = [ "bencher", "bitflags 2.6.0", @@ -3153,7 +3154,7 @@ dependencies = [ [[package]] name = "libsql-sys" -version = "0.10.0-pre.2" +version = "0.10.0-pre.3" dependencies = [ "bytes", "libsql-ffi", @@ -3184,7 +3185,7 @@ dependencies = [ [[package]] name = "libsql_replication" -version = "0.10.0-pre.2" +version = "0.10.0-pre.3" dependencies = [ "aes", "arbitrary", diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index 59e5cb5dfa..2302c13aa9 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -13,6 +13,7 @@ anyhow = "1.0.66" async-compression = { version = "0.4.4", features = ["tokio", "gzip", "zstd"] } aws-config = { version = "1" } aws-sdk-s3 = { version = "1" } +aws-smithy-types = { version = "1" } bytes = "1" libsql-sys = { path = "../libsql-sys" } libsql_replication = { path = "../libsql-replication" } diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 6f318fd125..28e5a9dc03 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -15,6 +15,7 @@ use aws_sdk_s3::operation::list_objects::builders::ListObjectsFluentBuilder; use aws_sdk_s3::operation::list_objects::ListObjectsOutput; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::{Client, Config}; +use aws_smithy_types::timeout::TimeoutConfig; use bytes::{Buf, Bytes}; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use libsql_replication::injector::Injector as _; @@ -121,6 +122,12 @@ pub struct Options { pub s3_max_parallelism: usize, /// Max number of retries for S3 operations pub s3_max_retries: u32, + /// Timeout for reading the first byte of an S3 response (seconds) + pub s3_read_timeout_secs: u64, + /// Timeout for establishing a TCP connection to S3 (seconds) + pub s3_connect_timeout_secs: u64, + /// Timeout for a single S3 operation attempt, including retries (seconds) + pub s3_operation_attempt_timeout_secs: u64, /// Skip snapshot upload per checkpoint. pub skip_snapshot: bool, /// Skip uploading snapshots on shutdown @@ -145,6 +152,11 @@ impl Options { "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY was not set" ))?; let session_token: Option = self.session_token.clone(); + let timeout_config = TimeoutConfig::builder() + .read_timeout(Duration::from_secs(self.s3_read_timeout_secs)) + .connect_timeout(Duration::from_secs(self.s3_connect_timeout_secs)) + .operation_attempt_timeout(Duration::from_secs(self.s3_operation_attempt_timeout_secs)) + .build(); let conf = loader .behavior_version(BehaviorVersion::latest()) .region(Region::new(region)) @@ -159,6 +171,7 @@ impl Options { aws_sdk_s3::config::retry::RetryConfig::standard() .with_max_attempts(self.s3_max_retries), ) + .timeout_config(timeout_config) .build(); let s3_config = aws_sdk_s3::config::Builder::from(&conf) @@ -233,6 +246,12 @@ impl Options { ), }; let s3_max_retries = env_var_or("LIBSQL_BOTTOMLESS_S3_MAX_RETRIES", 10).parse::()?; + let s3_read_timeout_secs = + env_var_or("LIBSQL_BOTTOMLESS_S3_READ_TIMEOUT_SECS", 5).parse::()?; + let s3_connect_timeout_secs = + env_var_or("LIBSQL_BOTTOMLESS_S3_CONNECT_TIMEOUT_SECS", 5).parse::()?; + let s3_operation_attempt_timeout_secs = + env_var_or("LIBSQL_BOTTOMLESS_S3_OPERATION_ATTEMPT_TIMEOUT_SECS", 10).parse::()?; let cipher = match encryption_cipher { Some(cipher) => Cipher::from_str(&cipher)?, None => Cipher::default(), @@ -261,6 +280,9 @@ impl Options { region, bucket_name, s3_max_retries, + s3_read_timeout_secs, + s3_connect_timeout_secs, + s3_operation_attempt_timeout_secs, skip_snapshot, skip_shutdown_upload, }) diff --git a/libsql-server/src/namespace/meta_store.rs b/libsql-server/src/namespace/meta_store.rs index 70b419ebe9..e1f6f11a45 100644 --- a/libsql-server/src/namespace/meta_store.rs +++ b/libsql-server/src/namespace/meta_store.rs @@ -129,6 +129,9 @@ pub async fn metastore_connection_maker( max_batch_interval: config.backup_interval, s3_max_parallelism: 32, s3_max_retries: 10, + s3_read_timeout_secs: 5, + s3_connect_timeout_secs: 5, + s3_operation_attempt_timeout_secs: 10, skip_snapshot: false, skip_shutdown_upload: false, }; diff --git a/libsql-server/tests/bottomless/basic_restore.rs b/libsql-server/tests/bottomless/basic_restore.rs index 2c193248e6..98390618d2 100644 --- a/libsql-server/tests/bottomless/basic_restore.rs +++ b/libsql-server/tests/bottomless/basic_restore.rs @@ -36,10 +36,10 @@ async fn test_basic_restore() { .expect("Failed to cleanup dbs dir"); // Phase 3: Start sqld - should restore from minio - let endpoint2 = sqld.http_endpoint(); sqld.start(data_dir.path()) .await .expect("Failed to restart sqld"); + let endpoint2 = sqld.http_endpoint(); sqld.wait_for_ready(Duration::from_secs(60)) .await .expect("sqld did not become ready after restore"); diff --git a/libsql-server/tests/bottomless/fixtures.rs b/libsql-server/tests/bottomless/fixtures.rs index ed6d8a1401..d0730b3ff3 100644 --- a/libsql-server/tests/bottomless/fixtures.rs +++ b/libsql-server/tests/bottomless/fixtures.rs @@ -1,13 +1,73 @@ -use std::path::Path; -use std::sync::atomic::{AtomicU16, AtomicU64, Ordering}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::OnceLock; use std::time::Duration; -static PORT_COUNTER: AtomicU16 = AtomicU16::new(0); static ID_COUNTER: AtomicU64 = AtomicU64::new(0); +static TEST_IMAGE_TAG: OnceLock = OnceLock::new(); + +fn build_test_image() -> String { + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let repo_root = manifest_dir + .parent() + .expect("CARGO_MANIFEST_DIR should have a parent"); + let tag = "libsql-server:test".to_string(); + + // Check if image already exists + let check_output = std::process::Command::new("docker") + .args(["images", "-q", &tag]) + .output() + .expect("failed to run docker images"); + + if !check_output.stdout.is_empty() { + return tag; + } + + let output = std::process::Command::new("docker") + .arg("build") + .arg("-t") + .arg(&tag) + .arg("-f") + .arg(repo_root.join("Dockerfile")) + .arg(repo_root) + .output() + .expect("failed to run docker build"); + + if !output.status.success() { + panic!( + "docker build failed: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + + tag +} -fn next_port() -> u16 { - let counter = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); - 20000 + (counter % 10000) +fn get_test_image() -> &'static str { + TEST_IMAGE_TAG.get_or_init(|| build_test_image()) +} + +async fn docker_host_port(container_name: &str, container_port: u16) -> anyhow::Result { + let output = tokio::process::Command::new("docker") + .args(["port", container_name, &format!("{}", container_port)]) + .output() + .await?; + if !output.status.success() { + anyhow::bail!( + "docker port failed: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + let line = String::from_utf8_lossy(&output.stdout); + // Format: "0.0.0.0:49153" + let port = line + .trim() + .split(':') + .last() + .ok_or_else(|| anyhow::anyhow!("unexpected docker port output: {}", line))? + .parse::() + .map_err(|e| anyhow::anyhow!("failed to parse port from '{}': {}", line, e))?; + Ok(port) } fn unique_id() -> String { @@ -31,8 +91,6 @@ pub struct MinioFixture { impl MinioFixture { pub async fn start() -> anyhow::Result { let uid = unique_id(); - let api_port = next_port(); - let console_port = next_port(); let container_name = format!("minio-test-{}", uid); let network_name = format!("sqld-net-{}", uid); @@ -48,7 +106,7 @@ impl MinioFixture { ); } - // Start minio container + // Start minio container with random host ports let run_output = tokio::process::Command::new("docker") .args([ "run", @@ -58,9 +116,9 @@ impl MinioFixture { "--network", &network_name, "-p", - &format!("{}:9000", api_port), + ":9000", "-p", - &format!("{}:9001", console_port), + ":9001", "-e", "MINIO_ROOT_USER=minioadmin", "-e", @@ -85,6 +143,10 @@ impl MinioFixture { ); } + // Discover dynamically assigned host ports + let api_port = docker_host_port(&container_name, 9000).await?; + let console_port = docker_host_port(&container_name, 9001).await?; + // Wait for minio to be ready let client = reqwest::Client::new(); let deadline = tokio::time::Instant::now() + Duration::from_secs(30); @@ -152,10 +214,6 @@ impl MinioFixture { }) } - pub fn endpoint(&self) -> String { - format!("http://127.0.0.1:{}", self.api_port) - } - pub fn internal_endpoint(&self) -> String { format!("http://{}:9000", self.container_name) } @@ -174,7 +232,7 @@ impl MinioFixture { Ok(()) } - pub async fn restart(&self) -> anyhow::Result<()> { + pub async fn restart(&mut self) -> anyhow::Result<()> { let output = tokio::process::Command::new("docker") .args(["start", &self.container_name]) .output() @@ -185,6 +243,9 @@ impl MinioFixture { String::from_utf8_lossy(&output.stderr) ); } + // Re-discover host ports after restart + self.api_port = docker_host_port(&self.container_name, 9000).await?; + self.console_port = docker_host_port(&self.container_name, 9001).await?; // Wait for minio to be ready let client = reqwest::Client::new(); let deadline = tokio::time::Instant::now() + Duration::from_secs(30); @@ -217,18 +278,19 @@ impl MinioFixture { } } -pub struct SqldFixture<'a> { - minio: &'a MinioFixture, +pub struct SqldFixture { + network_name: String, + internal_endpoint: String, http_port: u16, pub container_name: String, } -impl<'a> SqldFixture<'a> { - pub fn new(minio: &'a MinioFixture) -> Self { - let http_port = next_port(); +impl SqldFixture { + pub fn new(minio: &MinioFixture) -> Self { Self { - minio, - http_port, + network_name: minio.network_name.clone(), + internal_endpoint: minio.internal_endpoint(), + http_port: 0, container_name: format!("sqld-test-{}", unique_id()), } } @@ -241,7 +303,6 @@ impl<'a> SqldFixture<'a> { .await; let data_dir_str = data_dir.to_str().unwrap(); - let network_name = &self.minio.network_name; let run_output = tokio::process::Command::new("docker") .args([ @@ -250,14 +311,11 @@ impl<'a> SqldFixture<'a> { "--name", &self.container_name, "--network", - network_name, + &self.network_name, "-p", - &format!("{}:8080", self.http_port), + ":8080", "-e", - &format!( - "LIBSQL_BOTTOMLESS_ENDPOINT={}", - self.minio.internal_endpoint() - ), + &format!("LIBSQL_BOTTOMLESS_ENDPOINT={}", self.internal_endpoint), "-e", "LIBSQL_BOTTOMLESS_BUCKET=bottomless", "-e", @@ -270,9 +328,15 @@ impl<'a> SqldFixture<'a> { "SQLD_ENABLE_BOTTOMLESS_REPLICATION=true", "-e", "SQLD_DB_PATH=/var/lib/sqld", + "-e", + "LIBSQL_BOTTOMLESS_S3_READ_TIMEOUT_SECS=5", + "-e", + "LIBSQL_BOTTOMLESS_S3_CONNECT_TIMEOUT_SECS=5", + "-e", + "LIBSQL_BOTTOMLESS_S3_OPERATION_ATTEMPT_TIMEOUT_SECS=10", "-v", &format!("{}:/var/lib/sqld", data_dir_str), - "ghcr.io/tursodatabase/libsql-server:latest", + get_test_image(), ]) .output() .await?; @@ -284,6 +348,8 @@ impl<'a> SqldFixture<'a> { ); } + self.http_port = docker_host_port(&self.container_name, 8080).await?; + Ok(()) } @@ -326,6 +392,7 @@ impl<'a> SqldFixture<'a> { String::from_utf8_lossy(&output.stderr) ); } + self.http_port = docker_host_port(&self.container_name, 8080).await?; Ok(()) } diff --git a/libsql-server/tests/bottomless/minio_interrupted.rs b/libsql-server/tests/bottomless/minio_interrupted.rs index cae8d7f29b..ba8de5b3ee 100644 --- a/libsql-server/tests/bottomless/minio_interrupted.rs +++ b/libsql-server/tests/bottomless/minio_interrupted.rs @@ -5,7 +5,7 @@ use std::time::Duration; async fn test_restore_completes_after_minio_killed() { let _ = tracing_subscriber::fmt::try_init(); - let minio = MinioFixture::start().await.expect("Failed to start minio"); + let mut minio = MinioFixture::start().await.expect("Failed to start minio"); let data_dir = tempfile::tempdir().expect("Failed to create temp dir"); let mut sqld = SqldFixture::new(&minio); diff --git a/libsql-server/tests/bottomless/sqld_interrupted.rs b/libsql-server/tests/bottomless/sqld_interrupted.rs index c77b9890df..ff2ee7148b 100644 --- a/libsql-server/tests/bottomless/sqld_interrupted.rs +++ b/libsql-server/tests/bottomless/sqld_interrupted.rs @@ -51,8 +51,8 @@ async fn test_restore_completes_after_sqld_killed() { sqld.kill().await.expect("Failed to kill sqld"); // Phase 4: Restart sqld - must complete restore - let endpoint2 = sqld.http_endpoint(); sqld.restart().await.expect("Failed to restart sqld"); + let endpoint2 = sqld.http_endpoint(); sqld.wait_for_ready(Duration::from_secs(60)) .await .expect("sqld did not become ready after interrupted restore");