diff --git a/Cargo.lock b/Cargo.lock index 5524e4d9..6fb297aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -779,9 +779,9 @@ dependencies = [ [[package]] name = "cipherstash-client" -version = "0.31.1" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f3b35ded7920c83e600ee0044102183af5cd307aff4d7070061e6a2d403e956" +checksum = "bd8ee3770269505a5673c268ed54421e46091374c9ad1a9163d558f704d98cf1" dependencies = [ "aes-gcm-siv", "anyhow", @@ -839,9 +839,9 @@ dependencies = [ [[package]] name = "cipherstash-config" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d277d6f9749f5bf7ae43d317c4860c73893ce4ae2ffbba5b2d5d02d078ebf4cc" +checksum = "9bd94f7826557fa825161118a5b06d1ed60a66185e4f4d5766b9e5ae39669177" dependencies = [ "serde", "thiserror 1.0.69", @@ -1181,9 +1181,9 @@ dependencies = [ [[package]] name = "cts-common" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ab8098fcbe24a615b214857f9a06c48932438c060ecc32ea9e41481adea9faf" +checksum = "b1b0b629ef1939040e09625a0c70045650149f5b6be7bd0935fc0c4a50129b54" dependencies = [ "arrayvec", "axum", @@ -5612,9 +5612,9 @@ dependencies = [ [[package]] name = "zerokms-protocol" -version = "0.8.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d250d0934f3b3071c9d7a91dd26b389c45d5b1f8653aa6660e4efee09bf3063a" +checksum = "f7fe46b7fa449d5d14d9941ca83e3e92faa44e5f15ffb4b8fcc71158571279b7" dependencies = [ "async-trait", "base64", diff --git a/Cargo.toml b/Cargo.toml index 4693bdd9..022df1c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,8 +43,8 @@ debug = true [workspace.dependencies] sqltk = { version = "0.10.0" } -cipherstash-client = { version = "0.31.1" } -cts-common = { version = "0.4.0" } +cipherstash-client = { version = "0.32.2" } +cts-common = { version = "0.4.1" } thiserror = "2.0.9" tokio = { version = "1.44.2", features = ["full"] } diff --git a/mise.local.example.toml b/mise.local.example.toml index c2821e28..ab53bc9f 100644 --- a/mise.local.example.toml +++ b/mise.local.example.toml @@ -15,7 +15,7 @@ CS_CLIENT_KEY = "client-key" CS_CLIENT_ID = "client-id" # The release of EQL that the proxy tests will use and releases will be built with -CS_EQL_VERSION = "eql-2.1.8" +CS_EQL_VERSION = "eql-2.2.1" # TLS variables are required for providing TLS to Proxy's clients. # CS_TLS__TYPE can be either "Path" or "Pem" (case-sensitive). diff --git a/mise.toml b/mise.toml index d154c3ac..11b5e7fa 100644 --- a/mise.toml +++ b/mise.toml @@ -174,8 +174,39 @@ run = """ cargo nextest run --no-fail-fast --nocapture -p cipherstash-proxy-integration """ +[tasks."test:integration:setup:tls"] +description = "Setup for TLS integration tests: preflight, postgres, proxy" +run = """ +set -e + +echo +echo '###############################################' +echo '# Preflight' +echo '###############################################' +echo + +mise run test:integration:preflight + +echo +echo '###############################################' +echo '# Setup' +echo '###############################################' +echo + +mise --env tls run postgres:setup + +echo +echo '###############################################' +echo '# Start Proxy' +echo '###############################################' +echo + +mise --env tls run proxy:up proxy-tls --extra-args "--detach --wait" +mise --env tls run test:wait_for_postgres_to_quack --port 6432 --max-retries 20 --tls +""" + [tasks."test:integration:without_multitenant"] -description = "Runs integration tests excluding multitenant" +description = "Runs integration tests excluding multitenant (run test:integration:setup:tls first for standalone use)" run = """ cargo nextest run --no-fail-fast --nocapture -E 'package(cipherstash-proxy-integration) and not test(multitenant)' """ diff --git a/packages/cipherstash-proxy-integration/src/eql_regression.rs b/packages/cipherstash-proxy-integration/src/eql_regression.rs new file mode 100644 index 00000000..03949ccc --- /dev/null +++ b/packages/cipherstash-proxy-integration/src/eql_regression.rs @@ -0,0 +1,552 @@ +//! EQL Regression Tests +//! +//! These tests verify backwards compatibility with data encrypted by previous versions +//! of the proxy. This is critical for production deployments where existing data must +//! remain readable after proxy upgrades. +//! +//! ## How to use these tests: +//! +//! 1. **Generate fixtures from main branch:** +//! ``` +//! git checkout main +//! CS_GENERATE_EQL_FIXTURES=1 cargo nextest run -p cipherstash-proxy-integration eql_regression::generate +//! ``` +//! +//! 2. **Run regression tests on new branch:** +//! ``` +//! git checkout +//! cargo nextest run -p cipherstash-proxy-integration eql_regression::regression +//! ``` +//! +//! The fixtures are stored in `tests/fixtures/eql_regression/` and should be committed +//! to the repository after being generated from main. + +#[cfg(test)] +mod tests { + use crate::common::{clear, connect_with_tls, random_id, trace, PROXY}; + use serde::{Deserialize, Serialize}; + use serde_json::Value; + use std::fs; + use std::path::PathBuf; + use tokio_postgres::types::ToSql; + + const FIXTURES_DIR: &str = concat!( + env!("CARGO_MANIFEST_DIR"), + "/../../tests/fixtures/eql_regression" + ); + + /// Represents a captured EQL ciphertext for regression testing + #[derive(Debug, Serialize, Deserialize)] + struct EqlFixture { + /// Description of what this fixture tests + description: String, + /// The original plaintext value (for verification) + plaintext: Value, + /// The encrypted ciphertext as stored in the database + ciphertext: String, + /// The data type (text, jsonb, int4, etc.) + data_type: String, + } + + #[derive(Debug, Serialize, Deserialize)] + struct FixtureSet { + /// Version of the proxy that generated these fixtures + proxy_version: String, + /// Git commit hash when fixtures were generated + git_commit: String, + /// The fixtures + fixtures: Vec, + } + + fn get_database_port() -> u16 { + std::env::var("CS_DATABASE__PORT") + .map(|s| s.parse().unwrap()) + .unwrap_or(5617) // Default to TLS port + } + + /// Insert data via proxy and return the encrypted ciphertext from the database + async fn capture_encrypted_ciphertext( + column: &str, + plaintext: &(dyn ToSql + Sync), + plaintext_json: Value, + ) -> EqlFixture { + let id = random_id(); + + // Insert via proxy (will encrypt) + let proxy_client = connect_with_tls(PROXY).await; + let sql = format!("INSERT INTO encrypted (id, {column}) VALUES ($1, $2)"); + proxy_client + .execute(&sql, &[&id, plaintext]) + .await + .expect("Failed to insert via proxy"); + + // Read encrypted value directly from database (bypassing proxy) + let db_port = get_database_port(); + let db_client = connect_with_tls(db_port).await; + let sql = format!("SELECT {column}::text FROM encrypted WHERE id = $1"); + let rows = db_client + .query(&sql, &[&id]) + .await + .expect("Failed to query directly"); + + let ciphertext: String = rows[0].get(0); + + EqlFixture { + description: format!("Encrypted {column}"), + plaintext: plaintext_json, + ciphertext, + data_type: column.replace("encrypted_", ""), + } + } + + /// Insert pre-encrypted data directly into the database + async fn insert_encrypted_directly(id: i64, column: &str, ciphertext: &str) { + let db_port = get_database_port(); + let db_client = connect_with_tls(db_port).await; + + // Insert the raw ciphertext directly, casting from text to eql_v2_encrypted + let sql = format!("INSERT INTO encrypted (id, {column}) VALUES ($1, $2::eql_v2_encrypted)"); + db_client + .execute(&sql, &[&id, &ciphertext]) + .await + .expect("Failed to insert encrypted data directly"); + } + + /// Read and decrypt data via proxy + async fn decrypt_via_proxy(id: i64, column: &str) -> T + where + T: for<'a> tokio_postgres::types::FromSql<'a>, + { + let proxy_client = connect_with_tls(PROXY).await; + let sql = format!("SELECT {column} FROM encrypted WHERE id = $1"); + let rows = proxy_client + .query(&sql, &[&id]) + .await + .expect("Failed to query via proxy"); + + rows[0].get(0) + } + + fn fixtures_path() -> PathBuf { + PathBuf::from(FIXTURES_DIR) + } + + fn load_fixtures() -> Option { + let path = fixtures_path().join("fixtures.json"); + if path.exists() { + let content = fs::read_to_string(&path).expect("Failed to read fixtures file"); + Some(serde_json::from_str(&content).expect("Failed to parse fixtures")) + } else { + None + } + } + + fn save_fixtures(fixtures: &FixtureSet) { + let path = fixtures_path(); + fs::create_dir_all(&path).expect("Failed to create fixtures directory"); + + let content = serde_json::to_string_pretty(fixtures).expect("Failed to serialize fixtures"); + fs::write(path.join("fixtures.json"), content).expect("Failed to write fixtures file"); + } + + /// Generate fixtures from the current proxy version. + /// Run this on the main branch to create baseline fixtures. + /// + /// Set CS_GENERATE_EQL_FIXTURES=1 to enable fixture generation. + #[tokio::test] + async fn generate_fixtures() { + if std::env::var("CS_GENERATE_EQL_FIXTURES").is_err() { + println!("Skipping fixture generation. Set CS_GENERATE_EQL_FIXTURES=1 to generate."); + return; + } + + trace(); + clear().await; + + let mut fixtures = Vec::new(); + + // Text + let text_value = "regression test text"; + fixtures.push( + capture_encrypted_ciphertext( + "encrypted_text", + &text_value, + Value::String(text_value.to_string()), + ) + .await, + ); + + // Integer types + let int2_value: i16 = 42; + fixtures.push( + capture_encrypted_ciphertext( + "encrypted_int2", + &int2_value, + Value::Number(int2_value.into()), + ) + .await, + ); + + let int4_value: i32 = 12345; + fixtures.push( + capture_encrypted_ciphertext( + "encrypted_int4", + &int4_value, + Value::Number(int4_value.into()), + ) + .await, + ); + + let int8_value: i64 = 9876543210; + fixtures.push( + capture_encrypted_ciphertext( + "encrypted_int8", + &int8_value, + Value::Number(int8_value.into()), + ) + .await, + ); + + // Float + let float_value: f64 = std::f64::consts::PI; + fixtures.push( + capture_encrypted_ciphertext( + "encrypted_float8", + &float_value, + serde_json::json!(float_value), + ) + .await, + ); + + // Boolean + let bool_value = true; + fixtures.push( + capture_encrypted_ciphertext("encrypted_bool", &bool_value, Value::Bool(bool_value)) + .await, + ); + + // JSONB - simple object + let jsonb_value = serde_json::json!({ + "string": "hello", + "number": 42, + "nested": { + "key": "value" + }, + "array_string": ["a", "b", "c"], + "array_number": [1, 2, 3] + }); + fixtures.push( + capture_encrypted_ciphertext("encrypted_jsonb", &jsonb_value, jsonb_value.clone()) + .await, + ); + + // Get git commit for documentation + let git_commit = std::process::Command::new("git") + .args(["rev-parse", "HEAD"]) + .output() + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + + let fixture_set = FixtureSet { + proxy_version: env!("CARGO_PKG_VERSION").to_string(), + git_commit, + fixtures, + }; + + save_fixtures(&fixture_set); + println!( + "Generated {} fixtures at {:?}", + fixture_set.fixtures.len(), + fixtures_path() + ); + } + + /// Regression test: verify that data encrypted by a previous proxy version + /// can still be decrypted by the current version. + #[tokio::test] + async fn regression_decrypt_legacy_text() { + trace(); + + let Some(fixture_set) = load_fixtures() else { + println!("No fixtures found. Run generate_fixtures on main branch first."); + println!("Set CS_GENERATE_EQL_FIXTURES=1 and run on main branch."); + return; + }; + + clear().await; + + for fixture in &fixture_set.fixtures { + if fixture.data_type != "text" { + continue; + } + + let id = random_id(); + insert_encrypted_directly(id, "encrypted_text", &fixture.ciphertext).await; + + let decrypted: String = decrypt_via_proxy(id, "encrypted_text").await; + let expected = fixture.plaintext.as_str().unwrap(); + + assert_eq!( + decrypted, expected, + "Failed to decrypt legacy text. Fixture from commit: {}", + fixture_set.git_commit + ); + } + } + + #[tokio::test] + async fn regression_decrypt_legacy_int2() { + trace(); + + let Some(fixture_set) = load_fixtures() else { + return; + }; + + clear().await; + + for fixture in &fixture_set.fixtures { + if fixture.data_type != "int2" { + continue; + } + + let id = random_id(); + insert_encrypted_directly(id, "encrypted_int2", &fixture.ciphertext).await; + + let decrypted: i16 = decrypt_via_proxy(id, "encrypted_int2").await; + let expected = fixture.plaintext.as_i64().unwrap() as i16; + + assert_eq!( + decrypted, expected, + "Failed to decrypt legacy int2. Fixture from commit: {}", + fixture_set.git_commit + ); + } + } + + #[tokio::test] + async fn regression_decrypt_legacy_int4() { + trace(); + + let Some(fixture_set) = load_fixtures() else { + return; + }; + + clear().await; + + for fixture in &fixture_set.fixtures { + if fixture.data_type != "int4" { + continue; + } + + let id = random_id(); + insert_encrypted_directly(id, "encrypted_int4", &fixture.ciphertext).await; + + let decrypted: i32 = decrypt_via_proxy(id, "encrypted_int4").await; + let expected = fixture.plaintext.as_i64().unwrap() as i32; + + assert_eq!( + decrypted, expected, + "Failed to decrypt legacy int4. Fixture from commit: {}", + fixture_set.git_commit + ); + } + } + + #[tokio::test] + async fn regression_decrypt_legacy_int8() { + trace(); + + let Some(fixture_set) = load_fixtures() else { + return; + }; + + clear().await; + + for fixture in &fixture_set.fixtures { + if fixture.data_type != "int8" { + continue; + } + + let id = random_id(); + insert_encrypted_directly(id, "encrypted_int8", &fixture.ciphertext).await; + + let decrypted: i64 = decrypt_via_proxy(id, "encrypted_int8").await; + let expected = fixture.plaintext.as_i64().unwrap(); + + assert_eq!( + decrypted, expected, + "Failed to decrypt legacy int8. Fixture from commit: {}", + fixture_set.git_commit + ); + } + } + + #[tokio::test] + async fn regression_decrypt_legacy_float8() { + trace(); + + let Some(fixture_set) = load_fixtures() else { + return; + }; + + clear().await; + + for fixture in &fixture_set.fixtures { + if fixture.data_type != "float8" { + continue; + } + + let id = random_id(); + insert_encrypted_directly(id, "encrypted_float8", &fixture.ciphertext).await; + + let decrypted: f64 = decrypt_via_proxy(id, "encrypted_float8").await; + let expected = fixture.plaintext.as_f64().unwrap(); + + assert!( + (decrypted - expected).abs() < 0.0001, + "Failed to decrypt legacy float8. Fixture from commit: {}", + fixture_set.git_commit + ); + } + } + + #[tokio::test] + async fn regression_decrypt_legacy_bool() { + trace(); + + let Some(fixture_set) = load_fixtures() else { + return; + }; + + clear().await; + + for fixture in &fixture_set.fixtures { + if fixture.data_type != "bool" { + continue; + } + + let id = random_id(); + insert_encrypted_directly(id, "encrypted_bool", &fixture.ciphertext).await; + + let decrypted: bool = decrypt_via_proxy(id, "encrypted_bool").await; + let expected = fixture.plaintext.as_bool().unwrap(); + + assert_eq!( + decrypted, expected, + "Failed to decrypt legacy bool. Fixture from commit: {}", + fixture_set.git_commit + ); + } + } + + #[tokio::test] + async fn regression_decrypt_legacy_jsonb() { + trace(); + + let Some(fixture_set) = load_fixtures() else { + return; + }; + + clear().await; + + for fixture in &fixture_set.fixtures { + if fixture.data_type != "jsonb" { + continue; + } + + let id = random_id(); + insert_encrypted_directly(id, "encrypted_jsonb", &fixture.ciphertext).await; + + let decrypted: Value = decrypt_via_proxy(id, "encrypted_jsonb").await; + + assert_eq!( + decrypted, fixture.plaintext, + "Failed to decrypt legacy jsonb. Fixture from commit: {}", + fixture_set.git_commit + ); + } + } + + /// Test JSONB field access (-> operator) on legacy encrypted data + #[tokio::test] + async fn regression_jsonb_field_access() { + trace(); + + let Some(fixture_set) = load_fixtures() else { + return; + }; + + clear().await; + + for fixture in &fixture_set.fixtures { + if fixture.data_type != "jsonb" { + continue; + } + + let id = random_id(); + insert_encrypted_directly(id, "encrypted_jsonb", &fixture.ciphertext).await; + + // Test field access via proxy + let proxy_client = connect_with_tls(PROXY).await; + + // Access string field + let sql = "SELECT encrypted_jsonb->'string' FROM encrypted WHERE id = $1"; + let rows = proxy_client.query(sql, &[&id]).await.unwrap(); + let decrypted: Value = rows[0].get(0); + assert_eq!( + decrypted, fixture.plaintext["string"], + "Failed to access 'string' field on legacy jsonb" + ); + + // Access number field + let sql = "SELECT encrypted_jsonb->'number' FROM encrypted WHERE id = $1"; + let rows = proxy_client.query(sql, &[&id]).await.unwrap(); + let decrypted: Value = rows[0].get(0); + assert_eq!( + decrypted, fixture.plaintext["number"], + "Failed to access 'number' field on legacy jsonb" + ); + + // Access nested field + let sql = "SELECT encrypted_jsonb->'nested' FROM encrypted WHERE id = $1"; + let rows = proxy_client.query(sql, &[&id]).await.unwrap(); + let decrypted: Value = rows[0].get(0); + assert_eq!( + decrypted, fixture.plaintext["nested"], + "Failed to access 'nested' field on legacy jsonb" + ); + } + } + + /// Test JSONB array operations on legacy encrypted data + #[tokio::test] + async fn regression_jsonb_array_operations() { + trace(); + + let Some(fixture_set) = load_fixtures() else { + return; + }; + + clear().await; + + for fixture in &fixture_set.fixtures { + if fixture.data_type != "jsonb" { + continue; + } + + let id = random_id(); + insert_encrypted_directly(id, "encrypted_jsonb", &fixture.ciphertext).await; + + let proxy_client = connect_with_tls(PROXY).await; + + // Access array field + let sql = "SELECT encrypted_jsonb->'array_number' FROM encrypted WHERE id = $1"; + let rows = proxy_client.query(sql, &[&id]).await.unwrap(); + let decrypted: Value = rows[0].get(0); + assert_eq!( + decrypted, fixture.plaintext["array_number"], + "Failed to access 'array_number' field on legacy jsonb" + ); + } + } +} diff --git a/packages/cipherstash-proxy-integration/src/lib.rs b/packages/cipherstash-proxy-integration/src/lib.rs index 611e4b3a..da1f0a47 100644 --- a/packages/cipherstash-proxy-integration/src/lib.rs +++ b/packages/cipherstash-proxy-integration/src/lib.rs @@ -3,6 +3,7 @@ mod decrypt; mod disable_mapping; mod empty_result; mod encryption_sanity; +mod eql_regression; mod extended_protocol_error_messages; mod insert; mod map_concat; diff --git a/packages/cipherstash-proxy/src/error.rs b/packages/cipherstash-proxy/src/error.rs index 8faa0c58..afd37b69 100644 --- a/packages/cipherstash-proxy/src/error.rs +++ b/packages/cipherstash-proxy/src/error.rs @@ -315,69 +315,72 @@ pub enum EncryptError { // This impl is very boilerplatey but we can't simply re-export the `cipherstash-client` version of the error // because Proxy currently manages the documentation links. -impl From for EncryptError { - fn from(value: cipherstash_client::eql::EncryptError) -> Self { +impl From for EncryptError { + fn from(value: cipherstash_client::eql::EqlError) -> Self { match value { - cipherstash_client::eql::EncryptError::CiphertextCouldNotBeSerialised(error) => { + cipherstash_client::eql::EqlError::CiphertextCouldNotBeSerialised(error) => { Self::CiphertextCouldNotBeSerialised(error) } - cipherstash_client::eql::EncryptError::ColumnCouldNotBeParsed => { + cipherstash_client::eql::EqlError::ColumnCouldNotBeParsed => { Self::ColumnCouldNotBeParsed } - cipherstash_client::eql::EncryptError::ColumnIsNull => Self::ColumnIsNull, - cipherstash_client::eql::EncryptError::ColumnCouldNotBeDeserialised { - table, - column, - } => Self::ColumnCouldNotBeDeserialised { table, column }, - cipherstash_client::eql::EncryptError::ColumnCouldNotBeEncrypted { table, column } => { + cipherstash_client::eql::EqlError::ColumnIsNull => Self::ColumnIsNull, + cipherstash_client::eql::EqlError::ColumnCouldNotBeDeserialised { table, column } => { + Self::ColumnCouldNotBeDeserialised { table, column } + } + cipherstash_client::eql::EqlError::ColumnCouldNotBeEncrypted { table, column } => { Self::ColumnCouldNotBeEncrypted { table, column } } - cipherstash_client::eql::EncryptError::ColumnConfigurationMismatch { - table, - column, - } => Self::ColumnConfigurationMismatch { table, column }, - cipherstash_client::eql::EncryptError::CouldNotDecryptDataForKeyset { keyset_id } => { + cipherstash_client::eql::EqlError::ColumnConfigurationMismatch { table, column } => { + Self::ColumnConfigurationMismatch { table, column } + } + cipherstash_client::eql::EqlError::CouldNotDecryptDataForKeyset { keyset_id } => { Self::CouldNotDecryptDataForKeyset { keyset_id } } - cipherstash_client::eql::EncryptError::InvalidIndexTerm => Self::InvalidIndexTerm, - cipherstash_client::eql::EncryptError::KeysetIdCouldNotBeParsed { id } => { - Self::KeysetIdCouldNotBeParsed { id } + cipherstash_client::eql::EqlError::InvalidIndexTerm => Self::InvalidIndexTerm, + cipherstash_client::eql::EqlError::MissingCiphertext(identifier) => { + Self::ColumnCouldNotBeDeserialised { + table: identifier.table, + column: identifier.column, + } } - cipherstash_client::eql::EncryptError::KeysetIdCouldNotBeSet => { - Self::KeysetIdCouldNotBeSet + cipherstash_client::eql::EqlError::KeysetIdCouldNotBeParsed { id } => { + Self::KeysetIdCouldNotBeParsed { id } } - cipherstash_client::eql::EncryptError::KeysetNameCouldNotBeSet => { + cipherstash_client::eql::EqlError::KeysetIdCouldNotBeSet => Self::KeysetIdCouldNotBeSet, + cipherstash_client::eql::EqlError::KeysetNameCouldNotBeSet => { Self::KeysetNameCouldNotBeSet } - cipherstash_client::eql::EncryptError::MissingEncryptConfiguration { - plaintext_type, - } => Self::MissingEncryptConfiguration { plaintext_type }, - cipherstash_client::eql::EncryptError::PlaintextCouldNotBeEncoded => { + cipherstash_client::eql::EqlError::MissingEncryptConfiguration { plaintext_type } => { + Self::MissingEncryptConfiguration { plaintext_type } + } + cipherstash_client::eql::EqlError::PlaintextCouldNotBeEncoded => { Self::PlaintextCouldNotBeEncoded } - cipherstash_client::eql::EncryptError::Pipeline(encryption_error) => { + cipherstash_client::eql::EqlError::Pipeline(encryption_error) => { Self::Pipeline(encryption_error) } - cipherstash_client::eql::EncryptError::PlaintextCouldNotBeDecoded(type_parse_error) => { + cipherstash_client::eql::EqlError::PlaintextCouldNotBeDecoded(type_parse_error) => { Self::PlaintextCouldNotBeDecoded(type_parse_error) } - cipherstash_client::eql::EncryptError::MissingKeysetIdentifier => { + cipherstash_client::eql::EqlError::MissingKeysetIdentifier => { Self::MissingKeysetIdentifier } - cipherstash_client::eql::EncryptError::UnexpectedSetKeyset => Self::UnexpectedSetKeyset, - cipherstash_client::eql::EncryptError::UnknownColumn { table, column } => { + cipherstash_client::eql::EqlError::UnexpectedSetKeyset => Self::UnexpectedSetKeyset, + cipherstash_client::eql::EqlError::UnknownColumn { table, column } => { Self::UnknownColumn { table, column } } - cipherstash_client::eql::EncryptError::UnknownKeysetIdentifier { keyset } => { + cipherstash_client::eql::EqlError::UnknownKeysetIdentifier { keyset } => { Self::UnknownKeysetIdentifier { keyset } } - cipherstash_client::eql::EncryptError::UnknownTable { table } => { + cipherstash_client::eql::EqlError::UnknownTable { table } => { Self::UnknownTable { table } } - cipherstash_client::eql::EncryptError::UnknownIndexTerm(identifier) => { + cipherstash_client::eql::EqlError::UnknownIndexTerm(identifier) => { Self::UnknownIndexTerm(identifier) } - cipherstash_client::eql::EncryptError::ZeroKMS(message) => Self::ZeroKMS(message), + cipherstash_client::eql::EqlError::ZeroKMS(err) => Self::ZeroKMS(err.to_string()), + cipherstash_client::eql::EqlError::RecordDecrypt(err) => Self::ZeroKMS(err.to_string()), } } } diff --git a/packages/cipherstash-proxy/src/lib.rs b/packages/cipherstash-proxy/src/lib.rs index 1392fc69..2d4ac8fa 100644 --- a/packages/cipherstash-proxy/src/lib.rs +++ b/packages/cipherstash-proxy/src/lib.rs @@ -15,7 +15,8 @@ pub use crate::cli::Migrate; pub use crate::config::{DatabaseConfig, ServerConfig, TandemConfig, TlsConfig}; pub use crate::log::init; pub use crate::proxy::Proxy; -pub use cipherstash_client::eql::{EqlEncrypted, ForQuery, Identifier, Plaintext}; +pub use cipherstash_client::encryption::Plaintext; +pub use cipherstash_client::eql::{EqlCiphertext, Identifier}; use std::mem; diff --git a/packages/cipherstash-proxy/src/postgresql/backend.rs b/packages/cipherstash-proxy/src/postgresql/backend.rs index 4a667835..b5113aec 100644 --- a/packages/cipherstash-proxy/src/postgresql/backend.rs +++ b/packages/cipherstash-proxy/src/postgresql/backend.rs @@ -20,7 +20,7 @@ use crate::prometheus::{ }; use crate::proxy::EncryptionService; use bytes::BytesMut; -use cipherstash_client::eql::EqlEncrypted; +use cipherstash_client::eql::EqlCiphertext; use metrics::{counter, histogram}; use std::time::Instant; use tokio::io::AsyncRead; @@ -149,16 +149,28 @@ where /// Returns `Ok(())` on successful message processing, or an `Error` if a fatal /// error occurs that should terminate the connection. pub async fn rewrite(&mut self) -> Result<(), Error> { + let read_start = Instant::now(); let (code, mut bytes) = protocol::read_message( &mut self.server_reader, self.context.client_id, self.context.connection_timeout(), ) .await?; + let read_duration = read_start.elapsed(); let sent: u64 = bytes.len() as u64; counter!(SERVER_BYTES_RECEIVED_TOTAL).increment(sent); + // Log slow database responses (>100ms is concerning) + if read_duration.as_millis() > 100 { + warn!( + client_id = self.context.client_id, + msg = "Slow database response", + duration_ms = read_duration.as_millis(), + message_code = ?code, + ); + } + if self.context.is_passthrough() { debug!(target: DEVELOPMENT, client_id = self.context.client_id, @@ -430,7 +442,7 @@ where let projection_columns = portal.projection_columns(); // Each row is converted into Vec> - let ciphertexts: Vec> = rows + let ciphertexts: Vec> = rows .iter_mut() .flat_map(|row| row.as_ciphertext(projection_columns)) .collect::>(); @@ -492,7 +504,7 @@ where fn check_column_config( &mut self, projection_columns: &[Option], - ciphertexts: &[Option], + ciphertexts: &[Option], ) -> Result<(), Error> { for (col, ct) in projection_columns.iter().zip(ciphertexts) { match (col, ct) { diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index fa2365f4..f5af5d92 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -510,7 +510,7 @@ where &self, plaintexts: Vec>, columns: &[Option], - ) -> Result>, Error> { + ) -> Result>, Error> { let keyset_id = self.keyset_identifier(); self.encryption @@ -520,7 +520,7 @@ where pub async fn decrypt( &self, - ciphertexts: Vec>, + ciphertexts: Vec>, ) -> Result>, Error> { let keyset_id = self.keyset_identifier(); self.encryption.decrypt(keyset_id, ciphertexts).await @@ -684,14 +684,14 @@ mod tests { _keyset_id: Option, _plaintexts: Vec>, _columns: &[Option], - ) -> Result>, Error> { + ) -> Result>, Error> { Ok(vec![]) } async fn decrypt( &self, _keyset_id: Option, - _ciphertexts: Vec>, + _ciphertexts: Vec>, ) -> Result>, Error> { Ok(vec![]) } diff --git a/packages/cipherstash-proxy/src/postgresql/frontend.rs b/packages/cipherstash-proxy/src/postgresql/frontend.rs index 883b8039..7d07098e 100644 --- a/packages/cipherstash-proxy/src/postgresql/frontend.rs +++ b/packages/cipherstash-proxy/src/postgresql/frontend.rs @@ -25,7 +25,7 @@ use crate::prometheus::{ STATEMENTS_PASSTHROUGH_TOTAL, STATEMENTS_UNMAPPABLE_TOTAL, }; use crate::proxy::EncryptionService; -use crate::EqlEncrypted; +use crate::EqlCiphertext; use bytes::BytesMut; use cipherstash_client::encryption::Plaintext; use eql_mapper::{self, EqlMapperError, EqlTerm, TypeCheckedStatement}; @@ -255,7 +255,8 @@ where msg = "Bind Error", err = err.to_string() ); - return Err(err); + self.send_error_response(err)?; + return Ok(()); } }, } @@ -370,6 +371,7 @@ where /// - `Ok(None)` - No transformation needed, forward original query /// - `Err(error)` - Processing failed, error should be sent to client async fn query_handler(&mut self, bytes: &BytesMut) -> Result, Error> { + let handler_start = Instant::now(); self.context.start_session(); let mut query = Query::try_from(bytes)?; @@ -478,15 +480,32 @@ where query.rewrite(transformed_statement.to_string()); let bytes = BytesMut::try_from(query)?; + let handler_duration = handler_start.elapsed(); debug!( target: MAPPER, client_id = self.context.client_id, msg = "Rewrite Query", transformed_statement = transformed_statement.to_string(), + duration_ms = handler_duration.as_millis(), bytes = ?bytes, ); + if handler_duration.as_millis() > 100 { + warn!( + client_id = self.context.client_id, + msg = "Slow query handler processing", + duration_ms = handler_duration.as_millis(), + ); + } Ok(Some(bytes)) } else { + let handler_duration = handler_start.elapsed(); + if handler_duration.as_millis() > 50 { + debug!( + client_id = self.context.client_id, + msg = "Query handler processing time", + duration_ms = handler_duration.as_millis(), + ); + } Ok(None) } } @@ -512,12 +531,12 @@ where /// # Returns /// /// Vector of encrypted values corresponding to each literal, with `None` for - /// literals that don't require encryption and `Some(EqlEncrypted)` for encrypted values. + /// literals that don't require encryption and `Some(EqlCiphertext)` for encrypted values. async fn encrypt_literals( &mut self, typed_statement: &TypeCheckedStatement<'_>, literal_columns: &Vec>, - ) -> Result>, Error> { + ) -> Result>, Error> { let literal_values = typed_statement.literal_values(); if literal_values.is_empty() { debug!(target: MAPPER, @@ -562,7 +581,7 @@ where async fn transform_statement( &mut self, typed_statement: &TypeCheckedStatement<'_>, - encrypted_literals: &Vec>, + encrypted_literals: &Vec>, ) -> Result, Error> { // Convert literals to ast Expr let mut encrypted_expressions = vec![]; @@ -920,7 +939,7 @@ where &mut self, bind: &Bind, statement: &Statement, - ) -> Result>, Error> { + ) -> Result>, Error> { let plaintexts = bind.to_plaintext(&statement.param_columns, &statement.postgres_param_types)?; diff --git a/packages/cipherstash-proxy/src/postgresql/messages/bind.rs b/packages/cipherstash-proxy/src/postgresql/messages/bind.rs index c04e3c7f..a8dbf734 100644 --- a/packages/cipherstash-proxy/src/postgresql/messages/bind.rs +++ b/packages/cipherstash-proxy/src/postgresql/messages/bind.rs @@ -8,7 +8,7 @@ use crate::postgresql::protocol::BytesMutReadString; use crate::{SIZE_I16, SIZE_I32}; use bytes::{Buf, BufMut, BytesMut}; use cipherstash_client::encryption::Plaintext; -use cipherstash_client::eql::{self, EqlEncrypted}; +use cipherstash_client::eql::EqlCiphertext; use postgres_types::Type; use std::fmt::{self, Display, Formatter}; use std::io::Cursor; @@ -81,7 +81,7 @@ impl Bind { Ok(plaintexts) } - pub fn rewrite(&mut self, encrypted: Vec>) -> Result<(), Error> { + pub fn rewrite(&mut self, encrypted: Vec>) -> Result<(), Error> { for (idx, ct) in encrypted.iter().enumerate() { if let Some(ct) = ct { let json = serde_json::to_value(ct)?; @@ -195,29 +195,6 @@ impl Display for BindParam { } } -impl From<&BindParam> for Option { - fn from(bind_param: &BindParam) -> Self { - if !bind_param.maybe_plaintext() { - return None; - } - - let bytes = bind_param.json_bytes(); - let s = std::str::from_utf8(bytes).unwrap_or(""); - - match serde_json::from_str(s) { - Ok(pt) => Some(pt), - Err(e) => { - debug!( - param = s, - error = e.to_string(), - "Failed to parse parameter" - ); - None - } - } - } -} - impl TryFrom<&BytesMut> for Bind { type Error = Error; diff --git a/packages/cipherstash-proxy/src/postgresql/messages/data_row.rs b/packages/cipherstash-proxy/src/postgresql/messages/data_row.rs index ed94a591..e512eb66 100644 --- a/packages/cipherstash-proxy/src/postgresql/messages/data_row.rs +++ b/packages/cipherstash-proxy/src/postgresql/messages/data_row.rs @@ -5,7 +5,7 @@ use crate::{ postgresql::Column, }; use bytes::{Buf, BufMut, BytesMut}; -use cipherstash_client::eql::EqlEncrypted; +use cipherstash_client::eql::EqlCiphertext; use std::io::Cursor; use tracing::{debug, error}; @@ -23,7 +23,7 @@ impl DataRow { pub fn as_ciphertext( &mut self, column_configuration: &Vec>, - ) -> Vec> { + ) -> Vec> { let mut result = vec![]; for (data_column, column_config) in self.columns.iter_mut().zip(column_configuration) { let encrypted = column_config @@ -175,7 +175,7 @@ impl TryFrom for BytesMut { } } -impl TryFrom<&mut DataColumn> for EqlEncrypted { +impl TryFrom<&mut DataColumn> for EqlCiphertext { type Error = Error; fn try_from(col: &mut DataColumn) -> Result { diff --git a/packages/cipherstash-proxy/src/proxy/encrypt_config/config.rs b/packages/cipherstash-proxy/src/proxy/encrypt_config/config.rs index 3c2d493c..13c69dfa 100644 --- a/packages/cipherstash-proxy/src/proxy/encrypt_config/config.rs +++ b/packages/cipherstash-proxy/src/proxy/encrypt_config/config.rs @@ -183,7 +183,10 @@ impl Column { } if let Some(SteVecIndexOpts { prefix }) = self.indexes.ste_vec_index { - config = config.add_index(Index::new(IndexType::SteVec { prefix })) + config = config.add_index(Index::new(IndexType::SteVec { + prefix, + term_filters: vec![], + })) } config @@ -463,7 +466,8 @@ mod tests { assert_eq!( column.indexes[0].index_type, IndexType::SteVec { - prefix: "event-data".into() + prefix: "event-data".into(), + term_filters: vec![], }, ); } diff --git a/packages/cipherstash-proxy/src/proxy/mod.rs b/packages/cipherstash-proxy/src/proxy/mod.rs index aa42a45f..7ee12a8b 100644 --- a/packages/cipherstash-proxy/src/proxy/mod.rs +++ b/packages/cipherstash-proxy/src/proxy/mod.rs @@ -156,13 +156,13 @@ pub trait EncryptionService: Send + Sync { keyset_id: Option, plaintexts: Vec>, columns: &[Option], - ) -> Result>, Error>; + ) -> Result>, Error>; /// Decrypt values retrieved from the database async fn decrypt( &self, keyset_id: Option, - ciphertexts: Vec>, + ciphertexts: Vec>, ) -> Result>, Error>; } diff --git a/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs b/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs index 6961fa94..c3dddf21 100644 --- a/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs +++ b/packages/cipherstash-proxy/src/proxy/zerokms/zerokms.rs @@ -7,11 +7,17 @@ use crate::{ proxy::EncryptionService, }; use cipherstash_client::{ - encryption::{Plaintext, ReferencedPendingPipeline}, - eql::{self, decrypt_eql, encrypt_eql, EqlEncryptionSpec}, + encryption::{Plaintext, QueryOp}, + eql::{ + decrypt_eql, encrypt_eql, EqlCiphertext, EqlDecryptOpts, EqlEncryptOpts, EqlOperation, + PreparedPlaintext, + }, + schema::column::IndexType, }; +use eql_mapper::EqlTermVariant; use metrics::counter; use moka::future::Cache; +use std::borrow::Cow; use std::{sync::Arc, time::Duration}; use tracing::{debug, info, warn}; use uuid::Uuid; @@ -132,7 +138,7 @@ impl EncryptionService for ZeroKms { keyset_id: Option, plaintexts: Vec>, columns: &[Option], - ) -> Result>, Error> { + ) -> Result>, Error> { debug!(target: ENCRYPT, msg="Encrypt", ?keyset_id, default_keyset_id = ?self.default_keyset_id); // A keyset is required if no default keyset has been configured @@ -140,25 +146,76 @@ impl EncryptionService for ZeroKms { return Err(EncryptError::MissingKeysetIdentifier.into()); } - let cipher = self.init_cipher(keyset_id).await?; - let pipeline = ReferencedPendingPipeline::new(cipher.clone()); - - let encryption_specs: Vec> = columns - .iter() - .map(|col| { - col.as_ref().map(|col| { - if col.is_encryptable() { - EqlEncryptionSpec::Full(col.identifier.clone(), col.config.clone()) - } else { - EqlEncryptionSpec::SearchOnly(col.identifier.clone(), col.config.clone()) + let cipher = self.init_cipher(keyset_id.clone()).await?; + + // Collect indices and prepared plaintexts for non-None values + let mut indices: Vec = Vec::new(); + let mut prepared_plaintexts: Vec = Vec::new(); + + for (idx, (plaintext_opt, col_opt)) in plaintexts.iter().zip(columns.iter()).enumerate() { + if let (Some(plaintext), Some(col)) = (plaintext_opt, col_opt) { + // Determine the EQL operation based on the term variant + let eql_op = match col.eql_term { + // Full, Partial, and Tokenized terms store encrypted data with all indexes + EqlTermVariant::Full | EqlTermVariant::Partial | EqlTermVariant::Tokenized => { + EqlOperation::Store } - }) - }) - .collect(); - Ok(encrypt_eql(cipher, pipeline, plaintexts, &encryption_specs) + // JsonPath generates a selector term for SteVec queries (e.g., jsonb_path_query) + EqlTermVariant::JsonPath => col + .config + .indexes + .iter() + .find(|i| matches!(i.index_type, IndexType::SteVec { .. })) + .map(|index| { + EqlOperation::Query(&index.index_type, QueryOp::SteVecSelector) + }) + .unwrap_or(EqlOperation::Store), + + // JsonAccessor generates a selector for SteVec field access (-> operator) + EqlTermVariant::JsonAccessor => col + .config + .indexes + .iter() + .find(|i| matches!(i.index_type, IndexType::SteVec { .. })) + .map(|index| { + EqlOperation::Query(&index.index_type, QueryOp::SteVecSelector) + }) + .unwrap_or(EqlOperation::Store), + }; + + let prepared = PreparedPlaintext::new( + Cow::Owned(col.config.clone()), + col.identifier.clone(), + plaintext.clone(), + eql_op, + ); + indices.push(idx); + prepared_plaintexts.push(prepared); + } + } + + // If no plaintexts to encrypt, return all None + if prepared_plaintexts.is_empty() { + return Ok(vec![None; plaintexts.len()]); + } + + // Use default opts since cipher is already initialized with the correct keyset + let opts = EqlEncryptOpts::default(); + + debug!(target: ENCRYPT, msg="Calling encrypt_eql", count = prepared_plaintexts.len()); + let encrypted = encrypt_eql(cipher, prepared_plaintexts, &opts) .await - .map_err(EncryptError::from)?) + .map_err(EncryptError::from)?; + debug!(target: ENCRYPT, msg="encrypt_eql completed", count = encrypted.len()); + + // Reconstruct the result vector with None values in the right places + let mut result: Vec> = vec![None; plaintexts.len()]; + for (idx, ciphertext) in indices.into_iter().zip(encrypted.into_iter()) { + result[idx] = Some(ciphertext); + } + + Ok(result) } /// @@ -169,7 +226,7 @@ impl EncryptionService for ZeroKms { async fn decrypt( &self, keyset_id: Option, - ciphertexts: Vec>, + ciphertexts: Vec>, ) -> Result>, Error> { debug!(target: ENCRYPT, msg="Decrypt", ?keyset_id, default_keyset_id = ?self.default_keyset_id); @@ -180,10 +237,37 @@ impl EncryptionService for ZeroKms { let cipher = self.init_cipher(keyset_id.clone()).await?; - Ok( - decrypt_eql(keyset_id.map(|keyset_id| keyset_id.0), cipher, ciphertexts) - .await - .map_err(EncryptError::from)?, - ) + // Collect indices and ciphertexts for non-None values + let mut indices: Vec = Vec::new(); + let mut ciphertexts_to_decrypt: Vec = Vec::new(); + + for (idx, ct_opt) in ciphertexts.iter().enumerate() { + if let Some(ct) = ct_opt { + indices.push(idx); + ciphertexts_to_decrypt.push(ct.clone()); + } + } + + // If no ciphertexts to decrypt, return all None + if ciphertexts_to_decrypt.is_empty() { + return Ok(vec![None; ciphertexts.len()]); + } + + // Use default opts since cipher is already initialized with the correct keyset + let opts = EqlDecryptOpts::default(); + + debug!(target: ENCRYPT, msg="Calling decrypt_eql", count = ciphertexts_to_decrypt.len()); + let decrypted = decrypt_eql(cipher, ciphertexts_to_decrypt, &opts) + .await + .map_err(EncryptError::from)?; + debug!(target: ENCRYPT, msg="decrypt_eql completed", count = decrypted.len()); + + // Reconstruct the result vector with None values in the right places + let mut result: Vec> = vec![None; ciphertexts.len()]; + for (idx, plaintext) in indices.into_iter().zip(decrypted.into_iter()) { + result[idx] = Some(plaintext); + } + + Ok(result) } } diff --git a/tests/fixtures/eql_regression/.gitkeep b/tests/fixtures/eql_regression/.gitkeep new file mode 100644 index 00000000..e69de29b