From 4623ec4b96fd78bfc190ae1d1ae82fd120e965ed Mon Sep 17 00:00:00 2001 From: Paul Morris <10599524+pmorris-dev@users.noreply.github.com> Date: Fri, 6 Feb 2026 09:15:00 -0500 Subject: [PATCH] sub(feat): implement the base API and logic - This PR wires everything up and adds support for: - configuring what comes back in the change message - using an async stream to get change messages - new, specific error types - running integration tests for a variety of scenarios - Note: Interop with our connection manager is coming in the next PR. --- Cargo.lock | 80 ++ crates/sqlx-sqlite-observer/Cargo.toml | 4 + crates/sqlx-sqlite-observer/README.md | 111 ++- crates/sqlx-sqlite-observer/src/broker.rs | 34 + crates/sqlx-sqlite-observer/src/config.rs | 108 +++ crates/sqlx-sqlite-observer/src/connection.rs | 161 ++++ crates/sqlx-sqlite-observer/src/error.rs | 10 +- crates/sqlx-sqlite-observer/src/lib.rs | 102 ++- crates/sqlx-sqlite-observer/src/observer.rs | 178 +++++ crates/sqlx-sqlite-observer/src/stream.rs | 81 ++ .../tests/integration_tests.rs | 716 ++++++++++++++++++ 11 files changed, 1570 insertions(+), 15 deletions(-) create mode 100644 crates/sqlx-sqlite-observer/src/config.rs create mode 100644 crates/sqlx-sqlite-observer/src/connection.rs create mode 100644 crates/sqlx-sqlite-observer/src/observer.rs create mode 100644 crates/sqlx-sqlite-observer/src/stream.rs create mode 100644 crates/sqlx-sqlite-observer/tests/integration_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 7dc4804..5b71b00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -940,6 +940,21 @@ dependencies = [ "new_debug_unreachable", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -1013,6 +1028,7 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -2091,6 +2107,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-bigint-dig" version = "0.8.6" @@ -3368,6 +3393,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3711,13 +3745,17 @@ dependencies = [ name = "sqlx-sqlite-observer" version = "0.8.6" dependencies = [ + "futures", "libsqlite3-sys", "parking_lot", "regex", "sqlx", + "tempfile", "thiserror 2.0.17", "tokio", + "tokio-stream", "tracing", + "tracing-subscriber", ] [[package]] @@ -4214,6 +4252,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "time" version = "0.3.44" @@ -4307,6 +4354,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -4493,6 +4541,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -4657,6 +4731,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/crates/sqlx-sqlite-observer/Cargo.toml b/crates/sqlx-sqlite-observer/Cargo.toml index c6c55e7..afdae19 100644 --- a/crates/sqlx-sqlite-observer/Cargo.toml +++ b/crates/sqlx-sqlite-observer/Cargo.toml @@ -20,6 +20,7 @@ bundled = ["libsqlite3-sys/bundled"] [dependencies] tokio = { version = "1.49.0", features = ["sync"] } +tokio-stream = { version = "0.1", features = ["sync"] } thiserror = "2.0.17" tracing = { version = "0.1.44", default-features = false, features = ["std", "release_max_level_off"] } parking_lot = "0.12.3" @@ -30,3 +31,6 @@ libsqlite3-sys = { version = "0.30.1", features = ["preupdate_hook"] } [dev-dependencies] tokio = { version = "1.49.0", features = ["full", "macros"] } +futures = "0.3.31" +tempfile = "3.24.0" +tracing-subscriber = "0.3.22" diff --git a/crates/sqlx-sqlite-observer/README.md b/crates/sqlx-sqlite-observer/README.md index 2c13130..11ddb8e 100644 --- a/crates/sqlx-sqlite-observer/README.md +++ b/crates/sqlx-sqlite-observer/README.md @@ -98,14 +98,14 @@ This ensures subscribers **only receive notifications for committed changes**. * **`ChangeOperation`**: Insert, Update, or Delete * **`ColumnValue`**: Typed column value (Null, Integer, Real, Text, Blob) * **`ObserverConfig`**: Configuration for table filtering and channel - capacity + capacity -### Observer Types +### Observer Types * **`SqliteObserver`**: Main observer for `SqlitePool` connections * **`ObservableConnection`**: Connection wrapper with hooks registered -### Stream Types +### Stream Types * **`TableChangeStream`**: Async stream of table changes * **`TableChangeStreamExt`**: Extension trait for converting receivers to @@ -177,19 +177,110 @@ meaningful/correct for non-integer or composite primary keys. ## Examples -> **Coming in Phase 2** - Full working examples will be added in a subsequent PR. - ### Basic Usage - +```rust,no_run +use sqlx::SqlitePool; +use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let pool = SqlitePool::connect("sqlite:mydb.db").await?; + let observer = SqliteObserver::new(pool, ObserverConfig::default()); + + // Subscribe to changes on specific tables + let mut rx = observer.subscribe(["users"]); + + // Spawn a task to handle notifications + tokio::spawn(async move { + while let Ok(change) = rx.recv().await { + println!( + "Table {} row {} was {:?}", + change.table, + change.rowid.unwrap_or(-1), + change.operation + ); + if let Some(ColumnValue::Integer(id)) = change.primary_key.first() { + println!(" PK: {}", id); + } + } + }); + + // Use the observer to execute queries + let mut conn = observer.acquire().await?; + sqlx::query("INSERT INTO users (name) VALUES (?)") + .bind("Alice") + .execute(&mut **conn) + .await?; + + Ok(()) +} +``` ### Stream API - +```rust,no_run +use futures::StreamExt; +use sqlx::SqlitePool; +use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let pool = SqlitePool::connect("sqlite:mydb.db").await?; + let config = ObserverConfig::new().with_tables(["users", "posts"]); + let observer = SqliteObserver::new(pool, config); + + let mut stream = observer.subscribe_stream(["users"]); + + while let Some(change) = stream.next().await { + println!( + "Table {} row {} was {:?}", + change.table, + change.rowid.unwrap_or(-1), + change.operation + ); + } + + Ok(()) +} +``` ### Value Capture - +```rust,no_run +use sqlx::SqlitePool; +use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let pool = SqlitePool::connect("sqlite:mydb.db").await?; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["users"]); + let change = rx.recv().await?; + + // Access old/new column values + if let Some(old) = &change.old_values { + println!("Old values: {:?}", old); + } + if let Some(new) = &change.new_values { + println!("New values: {:?}", new); + } + + // Disable value capture for lower memory usage + let config = ObserverConfig::new() + .with_tables(["users"]) + .with_capture_values(false); + let observer = SqliteObserver::new( + SqlitePool::connect("sqlite:mydb.db").await?, + config, + ); + // old_values and new_values will be None + + Ok(()) +} +``` ### SQLx SQLite Connection Manager Integration @@ -204,8 +295,6 @@ buffered. All changes in a transaction are delivered at once on commit. If your transaction contains more mutating statements than this capacity, **messages will be dropped**. - - ```rust let config = ObserverConfig::new() .with_tables(["users", "posts"]) @@ -217,8 +306,6 @@ let config = ObserverConfig::new() By default, `TableChange` includes `old_values` and `new_values` with the actual column data. Disable this for lower memory usage if you only need row IDs: - - ```rust let config = ObserverConfig::new() .with_tables(["users"]) diff --git a/crates/sqlx-sqlite-observer/src/broker.rs b/crates/sqlx-sqlite-observer/src/broker.rs index 81c1a85..df3ef08 100644 --- a/crates/sqlx-sqlite-observer/src/broker.rs +++ b/crates/sqlx-sqlite-observer/src/broker.rs @@ -91,6 +91,40 @@ impl ObservationBroker { self.table_info.write().insert(table.to_string(), info); } + /// Registers multiple tables for observation without schema info. + /// + /// This is a two-phase registration: tables are marked for observation immediately, + /// but primary key extraction will return empty `Vec` until [`set_table_info`] is + /// called for each table. This is useful when you want to register tables before + /// their schema is known (e.g., before the first connection is acquired). + /// + /// **Prefer [`observe_table`] when schema info is available**, as it atomically + /// registers the table and sets schema info in one call. + /// + /// [`set_table_info`]: Self::set_table_info + /// [`observe_table`]: Self::observe_table + pub fn observe_tables(&self, tables: I) + where + I: IntoIterator, + S: AsRef, + { + let mut observed = self.observed_tables.write(); + for table in tables { + let table_name = table.as_ref().to_string(); + trace!(table = %table_name, "Observing table"); + observed.insert(table_name); + } + } + + /// Sets the schema information for an observed table. + /// + /// This information is used to extract primary key values and determine + /// whether the rowid is meaningful for the table. + pub fn set_table_info(&self, table: &str, info: TableInfo) { + trace!(table = %table, pk_columns = ?info.pk_columns, without_rowid = info.without_rowid, "Setting table info"); + self.table_info.write().insert(table.to_string(), info); + } + /// Gets the schema information for an observed table. pub fn get_table_info(&self, table: &str) -> Option { self.table_info.read().get(table).cloned() diff --git a/crates/sqlx-sqlite-observer/src/config.rs b/crates/sqlx-sqlite-observer/src/config.rs new file mode 100644 index 0000000..3d03c58 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/config.rs @@ -0,0 +1,108 @@ +use std::collections::HashSet; + +/// Configuration for the SQLite observer. +/// +/// Controls which tables are observed, the capacity of the broadcast channel +/// used to deliver change notifications to subscribers, and whether to capture +/// column values in change notifications. +#[derive(Debug, Clone)] +pub struct ObserverConfig { + /// Tables to observe for changes. + pub tables: HashSet, + + /// Capacity of the broadcast channel for change notifications. + /// + /// **Important:** All changes in a transaction are delivered at once on commit. + /// If your transaction contains more mutating SQL statements (INSERT/UPDATE/DELETE) + /// than this capacity, **messages will be dropped**. Set this value to at least + /// your largest expected transaction size. + /// + /// When messages are dropped, subscribers receive + /// [`tokio::sync::broadcast::error::RecvError::Lagged`] on their next receive: + /// + /// ```no_run + /// use tokio::sync::broadcast::error::RecvError; + /// use sqlx_sqlite_observer::TableChange; + /// + /// async fn handle_changes(mut rx: tokio::sync::broadcast::Receiver) { + /// match rx.recv().await { + /// Ok(change) => { /* process normally */ } + /// Err(RecvError::Lagged(n)) => { + /// // Missed n changes - consider re-querying full state + /// // Better yet, fix the bug by increasing channel_capacity + /// tracing::warn!("Missed {} change notifications", n); + /// } + /// Err(RecvError::Closed) => { /* observer dropped */ } + /// } + /// } + /// ``` + /// + /// Default: 256. + /// + /// [`TableChange`]: crate::TableChange + pub channel_capacity: usize, + + /// Whether to capture column values in change notifications. + /// + /// When `true` (default), [`TableChange`] includes `old_values` and `new_values` + /// with the actual column data before/after the change. When `false`, these + /// fields are `None`, reducing memory usage per notification. + /// + /// **Note:** This affects memory per message, not overflow likelihood. Overflow + /// is determined by the *count* of messages, not their size. + /// + /// Set to `false` if you only need to know *which* rows changed (table + rowid) + /// and will re-query the data yourself. + /// + /// [`TableChange`]: crate::TableChange + pub capture_values: bool, +} + +impl Default for ObserverConfig { + fn default() -> Self { + Self { + tables: HashSet::new(), + channel_capacity: 256, + capture_values: true, + } + } +} + +impl ObserverConfig { + /// Creates a new observer configuration with default settings. + /// + /// Defaults: no tables observed, channel capacity of 256, value capture enabled. + pub fn new() -> Self { + Self::default() + } + + /// Sets the tables to observe for changes. + /// + /// Only changes to these tables will generate notifications. + pub fn with_tables(mut self, tables: I) -> Self + where + I: IntoIterator, + S: Into, + { + self.tables = tables.into_iter().map(Into::into).collect(); + self + } + + /// Sets the broadcast channel capacity for change notifications. + /// + /// See [`channel_capacity`](Self::channel_capacity) for details on sizing. + pub fn with_channel_capacity(mut self, capacity: usize) -> Self { + self.channel_capacity = capacity; + self + } + + /// Controls whether column values are captured in change notifications. + /// + /// When enabled (default), `TableChange` will include `old_values` and + /// `new_values` with the actual column data. When disabled, these fields + /// will be `None`, which is faster but provides less information. + pub fn with_capture_values(mut self, capture: bool) -> Self { + self.capture_values = capture; + self + } +} diff --git a/crates/sqlx-sqlite-observer/src/connection.rs b/crates/sqlx-sqlite-observer/src/connection.rs new file mode 100644 index 0000000..2cba242 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/connection.rs @@ -0,0 +1,161 @@ +//! Observable connection wrapper with SQLite hook integration. +//! +//! Provides change tracking via SQLite's native preupdate/commit/rollback hooks +//! instead of triggers. Changes are buffered during transactions and only +//! published to subscribers after successful commit. + +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +use libsqlite3_sys::sqlite3; +use sqlx::Sqlite; +use sqlx::pool::PoolConnection; +use tracing::{debug, trace}; + +use crate::Result; +use crate::broker::ObservationBroker; +use crate::hooks; + +/// A wrapper around a SQLite pool connection allowing observers to subscribe to +/// change notifications. +/// +/// Uses SQLite's native hooks (preupdate_hook, commit_hook, rollback_hook) +/// for transaction-safe change tracking. Changes are buffered during transactions +/// and published to subscribers only after successful commit. +/// +/// Implements `Deref`/`DerefMut` to allow transparent use as the underlying +/// `PoolConnection`. +pub struct ObservableConnection { + conn: Option>, + broker: Arc, + hooks_registered: bool, + /// Raw sqlite3 pointer, cached during register_hooks so we can + /// call unregister_hooks synchronously in Drop without needing + /// the async lock_handle. + raw_db: Option<*mut sqlite3>, +} + +// SAFETY: The raw_db pointer is only used for hook registration/unregistration +// and is always accessed from the same logical owner. The underlying sqlite3 +// connection is already Send via sqlx's PoolConnection. +unsafe impl Send for ObservableConnection {} + +impl ObservableConnection { + pub(crate) fn new(conn: PoolConnection, broker: Arc) -> Self { + Self { + conn: Some(conn), + broker, + hooks_registered: false, + raw_db: None, + } + } + + fn conn_mut(&mut self) -> &mut PoolConnection { + self.conn.as_mut().expect("connection already taken") + } + + fn conn_ref(&self) -> &PoolConnection { + self.conn.as_ref().expect("connection already taken") + } + + /// Registers SQLite observation hooks on this connection. + /// + /// This must be called before any changes can be tracked. The hooks capture + /// changes during transactions and publish them on commit. + /// + /// # Safety + /// + /// This method accesses the raw SQLite connection handle. It is safe as long + /// as the connection is not being used concurrently from another thread. + pub async fn register_hooks(&mut self) -> Result<()> { + if self.hooks_registered { + return Ok(()); + } + + debug!("Registering SQLite observation hooks"); + + let conn = self.conn.as_mut().expect("connection already taken"); + + // Get raw SQLite handle through sqlx's lock mechanism + let mut handle = conn + .lock_handle() + .await + .map_err(|e| crate::Error::Database(format!("Failed to lock connection handle: {}", e)))?; + + let db: *mut sqlite3 = handle.as_raw_handle().as_ptr(); + + unsafe { + hooks::register_hooks(db, Arc::clone(&self.broker))?; + } + + // Cache the raw pointer so Drop can call unregister_hooks synchronously. + // SAFETY: The pointer remains valid for the lifetime of the PoolConnection, + // which we own via self.conn. + self.raw_db = Some(db); + self.hooks_registered = true; + Ok(()) + } + + /// Consumes this wrapper and returns the underlying pool connection. + /// + /// Hooks are unregistered before returning the connection, so it can be + /// safely returned to the pool or used without observation. + pub fn into_inner(mut self) -> PoolConnection { + // Unregister hooks before returning the connection to prevent + // use-after-free if the broker is dropped before the pooled connection is reused. + if self.hooks_registered + && let Some(db) = self.raw_db + { + unsafe { + crate::hooks::unregister_hooks(db); + } + trace!("Hooks unregistered before returning inner connection"); + } + self.hooks_registered = false; + self.raw_db = None; + // Safety: conn is always Some until this method consumes self + self.conn.take().unwrap() + } +} + +impl Drop for ObservableConnection { + fn drop(&mut self) { + if self.hooks_registered + && let Some(db) = self.raw_db + { + // SAFETY: db was obtained from lock_handle during register_hooks and + // remains valid because we still own the PoolConnection (self.conn). + // The connection has not been taken (into_inner clears hooks_registered). + unsafe { + hooks::unregister_hooks(db); + } + trace!("ObservableConnection dropped, hooks unregistered"); + } + } +} + +impl Deref for ObservableConnection { + type Target = PoolConnection; + + fn deref(&self) -> &Self::Target { + self.conn_ref() + } +} + +impl DerefMut for ObservableConnection { + fn deref_mut(&mut self) -> &mut Self::Target { + self.conn_mut() + } +} + +impl AsRef> for ObservableConnection { + fn as_ref(&self) -> &PoolConnection { + self.conn_ref() + } +} + +impl AsMut> for ObservableConnection { + fn as_mut(&mut self) -> &mut PoolConnection { + self.conn_mut() + } +} diff --git a/crates/sqlx-sqlite-observer/src/error.rs b/crates/sqlx-sqlite-observer/src/error.rs index 06ad3c8..6d4baee 100644 --- a/crates/sqlx-sqlite-observer/src/error.rs +++ b/crates/sqlx-sqlite-observer/src/error.rs @@ -8,9 +8,17 @@ pub enum Error { HookRegistration(String), /// SQLx database error. - #[error("Database error: {0}")] + #[error("SQLx error: {0}")] Sqlx(#[from] sqlx::Error), + /// Failed to acquire connection from pool. + #[error("Failed to acquire connection from pool")] + PoolAcquire, + + /// Database error (non-sqlx). + #[error("Database error: {0}")] + Database(String), + /// Schema mismatch - table schema changed while observing. #[error( "Schema mismatch for table '{table}': expected {expected} PK columns, but only {actual} values available" diff --git a/crates/sqlx-sqlite-observer/src/lib.rs b/crates/sqlx-sqlite-observer/src/lib.rs index 5320a63..cd881e4 100644 --- a/crates/sqlx-sqlite-observer/src/lib.rs +++ b/crates/sqlx-sqlite-observer/src/lib.rs @@ -19,19 +19,117 @@ //! 2. **Provide your own SQLite** with `SQLITE_ENABLE_PREUPDATE_HOOK` compiled in. //! Use [`is_preupdate_hook_enabled()`] to verify at runtime. //! -//! # Status +//! If preupdate hooks are not available, [`SqliteObserver::acquire()`] will return +//! an error with a descriptive message. //! -//! This crate is in early development. See the README for the planned API. +//! # Features +//! +//! - **Transaction-safe notifications** - changes only notify after successful commit +//! - **Typed column values** - access old/new values with native SQLite types +//! - **Stream support** - use `tokio_stream::Stream` for async iteration +//! - **Multiple subscribers** - broadcast channel supports multiple listeners +//! +//! # Basic Example +//! +//! ```rust,no_run +//! use sqlx::SqlitePool; +//! use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let pool = SqlitePool::connect("sqlite:mydb.db").await?; +//! let observer = SqliteObserver::new(pool, ObserverConfig::default()); +//! +//! // Subscribe to changes on specific tables +//! let mut rx = observer.subscribe(["users"]); +//! +//! // Spawn a task to handle notifications +//! tokio::spawn(async move { +//! while let Ok(change) = rx.recv().await { +//! println!( +//! "Table {} row {} was {:?}", +//! change.table, +//! change.rowid.unwrap_or(-1), +//! change.operation +//! ); +//! // Access typed column values +//! if let Some(old) = &change.old_values { +//! println!(" Old values: {:?}", old); +//! } +//! if let Some(new) = &change.new_values { +//! println!(" New values: {:?}", new); +//! } +//! } +//! }); +//! +//! // Use the observer to execute queries +//! let mut conn = observer.acquire().await?; +//! sqlx::query("INSERT INTO users (name) VALUES (?)") +//! .bind("Alice") +//! .execute(&mut **conn) +//! .await?; +//! +//! // Changes are published automatically when the transaction commits +//! drop(conn); +//! +//! Ok(()) +//! } +//! ``` +//! +//! # Stream Example +//! +//! ```rust,no_run +//! use futures::StreamExt; +//! use sqlx::SqlitePool; +//! use sqlx_sqlite_observer::{ChangeOperation, SqliteObserver, ObserverConfig, TableChangeStreamExt}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let pool = SqlitePool::connect("sqlite:mydb.db").await?; +//! let config = ObserverConfig::new().with_tables(["users", "posts"]); +//! let observer = SqliteObserver::new(pool, config); +//! +//! // Get a Stream instead of broadcast::Receiver +//! let mut stream = observer.subscribe_stream(["users"]); +//! +//! // Use standard Stream combinators +//! while let Some(change) = stream.next().await { +//! println!( +//! "Table {} row {} was {:?}", +//! change.table, +//! change.rowid.unwrap_or(-1), +//! change.operation +//! ); +//! // Access typed column values +//! if let Some(old) = &change.old_values { +//! println!(" Old values: {:?}", old); +//! } +//! if let Some(new) = &change.new_values { +//! println!(" New values: {:?}", new); +//! } +//! } +//! +//! Ok(()) +//! } +//! ``` pub mod broker; pub mod change; +pub mod config; +pub mod connection; pub mod error; pub mod hooks; +pub mod observer; pub mod schema; +pub mod stream; pub use broker::ObservationBroker; pub use change::{ChangeOperation, ColumnValue, TableChange, TableInfo}; +pub use config::ObserverConfig; +pub use connection::ObservableConnection; pub use error::Error; pub use hooks::{SqliteValue, is_preupdate_hook_enabled, unregister_hooks}; +pub use observer::SqliteObserver; +pub use stream::{TableChangeStream, TableChangeStreamExt}; pub type Result = std::result::Result; diff --git a/crates/sqlx-sqlite-observer/src/observer.rs b/crates/sqlx-sqlite-observer/src/observer.rs new file mode 100644 index 0000000..e0cf5a1 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/observer.rs @@ -0,0 +1,178 @@ +//! SQLite observer with transaction-safe change notifications. +//! +//! Uses SQLite's native hooks for change detection. + +use std::sync::Arc; + +use sqlx::SqlitePool; +use tokio::sync::broadcast; +use tracing::{debug, warn}; + +use crate::Result; +use crate::broker::ObservationBroker; +use crate::change::TableChange; +use crate::config::ObserverConfig; +use crate::connection::ObservableConnection; +use crate::error::Error; +use crate::schema::query_table_info; + +/// SQLite database observer with transaction-safe change notifications. +/// +/// Uses SQLite's native preupdate_hook, commit_hook, and rollback_hook for +/// change detection. Changes are buffered during transactions and only +/// published to subscribers after successful commit. Rolled-back transactions +/// produce no notifications. +/// +/// # SQLite Version Requirements +/// +/// Requires SQLite library compiled with `SQLITE_ENABLE_PREUPDATE_HOOK`. +pub struct SqliteObserver { + pool: SqlitePool, + broker: Arc, + config: ObserverConfig, +} + +impl SqliteObserver { + /// Creates a new observer for the given connection pool. + /// + /// Tables specified in the config will be automatically observed. + pub fn new(pool: SqlitePool, config: ObserverConfig) -> Self { + let broker = ObservationBroker::new(config.channel_capacity, config.capture_values); + + if !config.tables.is_empty() { + broker.observe_tables(config.tables.iter().map(String::as_str)); + } + + Self { + pool, + broker, + config, + } + } + + /// Subscribes to change notifications for the specified tables. + /// + /// If additional tables are provided, they will be added to the observed set. + /// Returns a broadcast receiver that will receive `TableChange` events + /// after transactions commit. + pub fn subscribe(&self, tables: I) -> broadcast::Receiver + where + I: IntoIterator, + S: Into, + { + let tables: Vec = tables.into_iter().map(Into::into).collect(); + if !tables.is_empty() { + self + .broker + .observe_tables(tables.iter().map(String::as_str)); + } + self.broker.subscribe() + } + + /// Subscribes to change notifications as a Stream. + /// + /// Returns a `TableChangeStream` that implements `futures::Stream`. + /// If tables are specified, the stream will only yield changes for those tables. + pub fn subscribe_stream(&self, tables: I) -> crate::stream::TableChangeStream + where + I: IntoIterator, + S: Into, + { + use crate::stream::TableChangeStreamExt; + let tables: Vec = tables.into_iter().map(Into::into).collect(); + // Register tables for observation (uses references, avoids clone) + if !tables.is_empty() { + self + .broker + .observe_tables(tables.iter().map(String::as_str)); + } + let rx = self.broker.subscribe(); + let stream = rx.into_stream(); + if tables.is_empty() { + stream + } else { + stream.filter_tables(tables) + } + } + + /// Acquires a connection from the pool with observation hooks registered. + /// + /// The returned connection will track changes to observed tables. Changes + /// are buffered during transactions and published to subscribers after commit. + /// + /// On first acquisition for each table, queries the schema to determine + /// primary key columns and WITHOUT ROWID status. + pub async fn acquire(&self) -> Result { + let conn = self.pool.acquire().await.map_err(|_| Error::PoolAcquire)?; + let mut observable = ObservableConnection::new(conn, Arc::clone(&self.broker)); + + // Query table info for any observed tables that don't have it yet + self.ensure_table_info(&mut observable).await?; + + observable.register_hooks().await?; + debug!("Acquired observable connection with hooks registered"); + Ok(observable) + } + + /// Ensures TableInfo is set for all observed tables. + async fn ensure_table_info(&self, conn: &mut ObservableConnection) -> Result<()> { + let observed = self.broker.get_observed_tables(); + + for table in observed { + if self.broker.get_table_info(&table).is_none() { + match query_table_info(conn, &table).await { + Ok(Some(info)) => { + debug!(table = %table, pk_columns = ?info.pk_columns, without_rowid = info.without_rowid, "Queried table info"); + self.broker.set_table_info(&table, info); + } + Ok(None) => { + warn!(table = %table, "Table not found in schema"); + } + Err(e) => { + warn!(table = %table, error = %e, "Failed to query table info"); + } + } + } + } + + Ok(()) + } + + /// Acquires a connection and registers additional tables for observation. + /// + /// The specified tables are added to the observed set before acquiring. + pub async fn acquire_and_observe(&self, tables: &[&str]) -> Result { + self.broker.observe_tables(tables.iter().copied()); + self.acquire().await + } + + /// Returns a reference to the underlying connection pool. + pub fn pool(&self) -> &SqlitePool { + &self.pool + } + + /// Returns a reference to the observer configuration. + pub fn config(&self) -> &ObserverConfig { + &self.config + } + + /// Returns a list of tables currently being observed. + pub fn observed_tables(&self) -> Vec { + self.broker.get_observed_tables() + } + + /// Returns a reference to the underlying observation broker. + pub fn broker(&self) -> &Arc { + &self.broker + } +} + +impl Clone for SqliteObserver { + fn clone(&self) -> Self { + Self { + pool: self.pool.clone(), + broker: Arc::clone(&self.broker), + config: self.config.clone(), + } + } +} diff --git a/crates/sqlx-sqlite-observer/src/stream.rs b/crates/sqlx-sqlite-observer/src/stream.rs new file mode 100644 index 0000000..3e33525 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/stream.rs @@ -0,0 +1,81 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use tokio::sync::broadcast; +use tokio_stream::Stream; +use tokio_stream::wrappers::BroadcastStream; +use tracing::warn; + +use crate::change::TableChange; + +/// A filtered stream of table change notifications. +/// +/// Wraps a `BroadcastStream` with optional table filtering. Uses proper async +/// wakeups instead of busy-polling. +pub struct TableChangeStream { + inner: BroadcastStream, + filter_tables: Option>, +} + +impl TableChangeStream { + pub fn new(rx: broadcast::Receiver) -> Self { + Self { + inner: BroadcastStream::new(rx), + filter_tables: None, + } + } + + pub fn filter_tables(mut self, tables: Vec) -> Self { + self.filter_tables = Some(tables); + self + } +} + +impl Stream for TableChangeStream { + type Item = TableChange; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + // BroadcastStream is Unpin, so we can safely create a pinned reference + let inner = Pin::new(&mut self.inner); + + match inner.poll_next(cx) { + Poll::Ready(Some(Ok(change))) => { + if let Some(ref tables) = self.filter_tables + && !tables.contains(&change.table) + { + continue; + } + return Poll::Ready(Some(change)); + } + Poll::Ready(Some(Err(err))) => { + // Lagged error - missed some messages due to slow consumption + warn!( + error = %err, + "Stream lagged — missed change notifications. Consider increasing channel_capacity." + ); + continue; + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } +} + +/// Extension trait for converting broadcast receivers into table change streams. +/// +/// Provides a convenient way to convert a `broadcast::Receiver` into +/// a `TableChangeStream` that implements `futures::Stream`. +pub trait TableChangeStreamExt { + /// Converts this receiver into a `TableChangeStream`. + /// + /// The returned stream can be further filtered using [`TableChangeStream::filter_tables`]. + fn into_stream(self) -> TableChangeStream; +} + +impl TableChangeStreamExt for broadcast::Receiver { + fn into_stream(self) -> TableChangeStream { + TableChangeStream::new(self) + } +} diff --git a/crates/sqlx-sqlite-observer/tests/integration_tests.rs b/crates/sqlx-sqlite-observer/tests/integration_tests.rs new file mode 100644 index 0000000..f993be8 --- /dev/null +++ b/crates/sqlx-sqlite-observer/tests/integration_tests.rs @@ -0,0 +1,716 @@ +//! Integration tests for hooks-based SQLite observation. +//! +//! Tests verify: +//! - Transaction semantics: only committed changes publish notifications +//! - CRUD notifications: insert, update, delete each trigger appropriately +//! - Value capture: old/new column values are captured per operation type +//! - Filtering: only observed tables trigger notifications +//! - Multi-subscriber: all subscribers receive notifications + +use futures::StreamExt; +use sqlx::SqlitePool; +use sqlx_sqlite_observer::{ChangeOperation, ColumnValue, ObserverConfig, SqliteObserver}; +use std::time::Duration; +use tokio::time::timeout; + +async fn setup_test_db() -> SqlitePool { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + + sqlx::query( + r#" + CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL + ) + "#, + ) + .execute(&pool) + .await + .unwrap(); + + sqlx::query( + r#" + CREATE TABLE posts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + title TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) + ) + "#, + ) + .execute(&pool) + .await + .unwrap(); + + pool +} + +fn has_text_value(values: &[ColumnValue], expected: &str) -> bool { + values + .iter() + .any(|v| matches!(v, ColumnValue::Text(s) if s == expected)) +} + +// ============================================================================ +// Observer Lifecycle +// ============================================================================ + +#[tokio::test] +async fn test_observer_starts_with_no_tables() { + let pool = setup_test_db().await; + let observer = SqliteObserver::new(pool, ObserverConfig::default()); + + assert!(observer.observed_tables().is_empty()); +} + +#[tokio::test] +async fn test_subscribe_adds_tables_to_observed_set() { + let pool = setup_test_db().await; + let observer = SqliteObserver::new(pool, ObserverConfig::default()); + + let _rx = observer.subscribe(["users", "posts"]); + + let tables = observer.observed_tables(); + assert_eq!(tables.len(), 2); + assert!(tables.contains(&"users".to_string())); + assert!(tables.contains(&"posts".to_string())); +} + +#[tokio::test] +async fn test_config_presets_observed_tables() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + assert_eq!(observer.observed_tables().len(), 1); + assert!(observer.observed_tables().contains(&"users".to_string())); +} + +// ============================================================================ +// Transaction Semantics +// ============================================================================ + +#[tokio::test] +async fn test_commit_publishes_notification() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let result = timeout(Duration::from_millis(100), rx.recv()).await; + assert!(result.is_ok(), "Should receive notification after commit"); + + let change = result.unwrap().unwrap(); + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Insert)); +} + +#[tokio::test] +async fn test_rollback_discards_changes() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("ROLLBACK").execute(&mut **conn).await.unwrap(); + + let result = timeout(Duration::from_millis(50), rx.recv()).await; + assert!( + result.is_err(), + "Should NOT receive notification after rollback" + ); +} + +#[tokio::test] +async fn test_multiple_changes_in_transaction() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("INSERT INTO users (name) VALUES ('Bob')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("INSERT INTO users (name) VALUES ('Charlie')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + // Should receive all three notifications + for expected in ["Alice", "Bob", "Charlie"] { + let result = timeout(Duration::from_millis(100), rx.recv()).await; + assert!( + result.is_ok(), + "Should receive notification for {}", + expected + ); + let change = result.unwrap().unwrap(); + assert!(has_text_value( + change.new_values.as_ref().unwrap(), + expected + )); + } +} + +// ============================================================================ +// CRUD Operations +// ============================================================================ + +#[tokio::test] +async fn test_insert_notification() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + // Implicit transaction (auto-commit) + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut **conn) + .await + .unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Insert)); + assert!(change.rowid.is_some()); + assert!(change.old_values.is_none(), "INSERT has no old_values"); + assert!(change.new_values.is_some(), "INSERT has new_values"); + assert!(has_text_value(change.new_values.as_ref().unwrap(), "Alice")); +} + +#[tokio::test] +async fn test_update_notification() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + // Seed data + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(observer.pool()) + .await + .unwrap(); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("UPDATE users SET name = 'Bob' WHERE id = 1") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Update)); + assert!(change.old_values.is_some(), "UPDATE has old_values"); + assert!(change.new_values.is_some(), "UPDATE has new_values"); + assert!(has_text_value(change.old_values.as_ref().unwrap(), "Alice")); + assert!(has_text_value(change.new_values.as_ref().unwrap(), "Bob")); +} + +#[tokio::test] +async fn test_delete_notification() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + // Seed data + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(observer.pool()) + .await + .unwrap(); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + // Implicit transaction (auto-commit) + sqlx::query("DELETE FROM users WHERE id = 1") + .execute(&mut **conn) + .await + .unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Delete)); + assert!(change.old_values.is_some(), "DELETE has old_values"); + assert!(change.new_values.is_none(), "DELETE has no new_values"); + assert!(has_text_value(change.old_values.as_ref().unwrap(), "Alice")); +} + +// ============================================================================ +// Filtering +// ============================================================================ + +#[tokio::test] +async fn test_untracked_table_ignored() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); // Only users, not posts + let observer = SqliteObserver::new(pool, config); + + // Seed user for foreign key + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(observer.pool()) + .await + .unwrap(); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO posts (user_id, title) VALUES (1, 'Hello')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let result = timeout(Duration::from_millis(50), rx.recv()).await; + assert!(result.is_err(), "Should NOT notify for untracked table"); +} + +// ============================================================================ +// Multi-Subscriber & Clone +// ============================================================================ + +#[tokio::test] +async fn test_all_subscribers_receive_notification() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx1 = observer.subscribe(["users"]); + let mut rx2 = observer.subscribe(["users"]); + + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let result1 = timeout(Duration::from_millis(100), rx1.recv()).await; + let result2 = timeout(Duration::from_millis(100), rx2.recv()).await; + + assert!(result1.is_ok(), "Subscriber 1 receives notification"); + assert!(result2.is_ok(), "Subscriber 2 receives notification"); +} + +#[tokio::test] +async fn test_cloned_observer_shares_state() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer1 = SqliteObserver::new(pool, config); + let observer2 = observer1.clone(); + + // Subscribe on original, write through clone + let mut rx = observer1.subscribe(["users"]); + let mut conn = observer2.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let result = timeout(Duration::from_millis(100), rx.recv()).await; + assert!(result.is_ok(), "Receives notification through clone"); +} + +// ============================================================================ +// Stream API +// ============================================================================ + +#[tokio::test] +async fn test_stream_receives_notifications() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut stream = observer.subscribe_stream(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let result = timeout(Duration::from_millis(100), stream.next()).await; + assert!(result.is_ok(), "Stream receives notification"); + + let change = result.unwrap().unwrap(); + assert_eq!(change.table, "users"); +} + +#[tokio::test] +async fn test_stream_filters_tables() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users", "posts"]); + let observer = SqliteObserver::new(pool, config); + + // Seed user for foreign key + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(observer.pool()) + .await + .unwrap(); + + // Subscribe only to users, not posts + let mut stream = observer.subscribe_stream(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + // Insert into posts (should be filtered out by stream) + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO posts (user_id, title) VALUES (1, 'Hello')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let result = timeout(Duration::from_millis(50), stream.next()).await; + assert!(result.is_err(), "Stream filters out non-subscribed tables"); +} + +// ============================================================================ +// Value Capture +// ============================================================================ + +#[tokio::test] +async fn test_column_value_types() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('TestUser')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + let values = change.new_values.unwrap(); + + let has_integer = values.iter().any(|v| matches!(v, ColumnValue::Integer(_))); + let has_text = values.iter().any(|v| matches!(v, ColumnValue::Text(_))); + + assert!(has_integer, "Should capture Integer (id column)"); + assert!(has_text, "Should capture Text (name column)"); +} + +#[tokio::test] +async fn test_capture_values_disabled() { + let pool = setup_test_db().await; + let config = ObserverConfig::new() + .with_tables(["users"]) + .with_capture_values(false); + + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut **conn) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + // With capture_values=false, we still get table/operation/rowid but no values + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Insert)); + assert!(change.rowid.is_some()); + assert!( + change.old_values.is_none(), + "No values when capture disabled" + ); + assert!( + change.new_values.is_none(), + "No values when capture disabled" + ); +} + +// ============================================================================ +// Primary Key Extraction +// ============================================================================ + +#[tokio::test] +async fn test_single_column_primary_key() { + let pool = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut **conn) + .await + .unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "users"); + assert!(!change.primary_key.is_empty(), "Should have primary key"); + assert_eq!(change.primary_key.len(), 1, "Single-column PK"); + + // The PK should be the auto-incremented id (1) + assert_eq!( + change.primary_key[0], + ColumnValue::Integer(1), + "PK should be id=1" + ); +} + +#[tokio::test] +async fn test_composite_primary_key() { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + + // Create a table with a composite primary key + sqlx::query( + r#" + CREATE TABLE user_roles ( + user_id INTEGER NOT NULL, + role_id INTEGER NOT NULL, + granted_at TEXT, + PRIMARY KEY (user_id, role_id) + ) + "#, + ) + .execute(&pool) + .await + .unwrap(); + + let config = ObserverConfig::new().with_tables(["user_roles"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["user_roles"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query( + "INSERT INTO user_roles (user_id, role_id, granted_at) VALUES (42, 7, '2024-01-01')", + ) + .execute(&mut **conn) + .await + .unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "user_roles"); + assert_eq!(change.primary_key.len(), 2, "Composite PK has 2 columns"); + + // PK columns should be in declaration order: (user_id, role_id) + assert_eq!( + change.primary_key[0], + ColumnValue::Integer(42), + "First PK column is user_id=42" + ); + assert_eq!( + change.primary_key[1], + ColumnValue::Integer(7), + "Second PK column is role_id=7" + ); +} + +#[tokio::test] +async fn test_text_primary_key() { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + + // Create a table with a TEXT primary key + sqlx::query( + r#" + CREATE TABLE settings ( + key TEXT PRIMARY KEY, + value TEXT + ) + "#, + ) + .execute(&pool) + .await + .unwrap(); + + let config = ObserverConfig::new().with_tables(["settings"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["settings"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("INSERT INTO settings (key, value) VALUES ('theme', 'dark')") + .execute(&mut **conn) + .await + .unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "settings"); + assert_eq!(change.primary_key.len(), 1, "Single TEXT PK"); + assert_eq!( + change.primary_key[0], + ColumnValue::Text("theme".to_string()), + "PK should be key='theme'" + ); +} + +#[tokio::test] +async fn test_without_rowid_table() { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + + // Create a WITHOUT ROWID table + sqlx::query( + r#" + CREATE TABLE kv_store ( + key TEXT PRIMARY KEY, + value BLOB + ) WITHOUT ROWID + "#, + ) + .execute(&pool) + .await + .unwrap(); + + let config = ObserverConfig::new().with_tables(["kv_store"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["kv_store"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("INSERT INTO kv_store (key, value) VALUES ('mykey', X'DEADBEEF')") + .execute(&mut **conn) + .await + .unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "kv_store"); + + // For WITHOUT ROWID tables, rowid should be None + assert!( + change.rowid.is_none(), + "WITHOUT ROWID table should have rowid=None" + ); + + // But primary_key should still be populated + assert_eq!(change.primary_key.len(), 1); + assert_eq!( + change.primary_key[0], + ColumnValue::Text("mykey".to_string()), + "PK should be key='mykey'" + ); +} + +#[tokio::test] +async fn test_delete_returns_old_primary_key() { + let pool = setup_test_db().await; + + // Seed data + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&pool) + .await + .unwrap(); + + let config = ObserverConfig::new().with_tables(["users"]); + let observer = SqliteObserver::new(pool, config); + + let mut rx = observer.subscribe(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + sqlx::query("DELETE FROM users WHERE id = 1") + .execute(&mut **conn) + .await + .unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.operation, Some(ChangeOperation::Delete)); + + // For DELETE, primary_key should contain the OLD key values + assert_eq!(change.primary_key.len(), 1); + assert_eq!( + change.primary_key[0], + ColumnValue::Integer(1), + "DELETE should return old PK value" + ); +}