diff --git a/Cargo.lock b/Cargo.lock index 5b71b00..16bae15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3741,6 +3741,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "sqlx-sqlite-conn-mgr" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d1ee0837ee4c49036a64632adf814c1799ec54cc0ba31712d263ce12c09ad45" +dependencies = [ + "serde", + "sqlx", + "thiserror 2.0.17", + "tokio", + "tracing", +] + [[package]] name = "sqlx-sqlite-observer" version = "0.8.6" @@ -3750,6 +3763,7 @@ dependencies = [ "parking_lot", "regex", "sqlx", + "sqlx-sqlite-conn-mgr 0.8.6 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile", "thiserror 2.0.17", "tokio", @@ -4076,7 +4090,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "sqlx-sqlite-conn-mgr", + "sqlx-sqlite-conn-mgr 0.8.6", "tauri", "tauri-plugin", "tempfile", diff --git a/crates/sqlx-sqlite-observer/Cargo.toml b/crates/sqlx-sqlite-observer/Cargo.toml index afdae19..b0b30d0 100644 --- a/crates/sqlx-sqlite-observer/Cargo.toml +++ b/crates/sqlx-sqlite-observer/Cargo.toml @@ -13,6 +13,7 @@ keywords = ["sqlite", "sqlx", "reactive", "observer", "database"] categories = ["database", "asynchronous"] [features] +conn-mgr = ["dep:sqlx-sqlite-conn-mgr"] # Bundle SQLite by default - preupdate hooks require SQLITE_ENABLE_PREUPDATE_HOOK # which most system SQLite libraries don't have enabled. default = ["bundled"] @@ -28,6 +29,7 @@ regex = "1.12.3" sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio"], default-features = false } # Required for preupdate_hook - SQLite must be compiled with SQLITE_ENABLE_PREUPDATE_HOOK libsqlite3-sys = { version = "0.30.1", features = ["preupdate_hook"] } +sqlx-sqlite-conn-mgr = { version = "0.8.6", optional = true } [dev-dependencies] tokio = { version = "1.49.0", features = ["full", "macros"] } diff --git a/crates/sqlx-sqlite-observer/README.md b/crates/sqlx-sqlite-observer/README.md index 11ddb8e..a751792 100644 --- a/crates/sqlx-sqlite-observer/README.md +++ b/crates/sqlx-sqlite-observer/README.md @@ -95,6 +95,8 @@ This ensures subscribers **only receive notifications for committed changes**. ### Core Types * **`TableChange`**: Notification of a change to a database table + * **`TableChangeEvent`**: Event yielded by `TableChangeStream` — + either `Change(TableChange)` or `Lagged(u64)` * **`ChangeOperation`**: Insert, Update, or Delete * **`ColumnValue`**: Typed column value (Null, Integer, Real, Text, Blob) * **`ObserverConfig`**: Configuration for table filtering and channel @@ -111,7 +113,7 @@ This ensures subscribers **only receive notifications for committed changes**. * **`TableChangeStreamExt`**: Extension trait for converting receivers to streams -### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`) +### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`) * **`ObservableSqliteDatabase`**: Wrapper for `SqliteDatabase` with observation * **`ObservableWriteGuard`**: Write guard with hooks registered @@ -179,7 +181,7 @@ meaningful/correct for non-integer or composite primary keys. ### Basic Usage -```rust,no_run +```rust use sqlx::SqlitePool; use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue}; @@ -219,10 +221,12 @@ async fn main() -> Result<(), Box> { ### Stream API -```rust,no_run +```rust use futures::StreamExt; use sqlx::SqlitePool; -use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig}; +use sqlx_sqlite_observer::{ + SqliteObserver, ObserverConfig, TableChangeEvent, +}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -232,13 +236,20 @@ async fn main() -> Result<(), Box> { let mut stream = observer.subscribe_stream(["users"]); - while let Some(change) = stream.next().await { - println!( - "Table {} row {} was {:?}", - change.table, - change.rowid.unwrap_or(-1), - change.operation - ); + while let Some(event) = stream.next().await { + match event { + TableChangeEvent::Change(change) => { + println!( + "Table {} row {} was {:?}", + change.table, + change.rowid.unwrap_or(-1), + change.operation + ); + } + TableChangeEvent::Lagged(n) => { + eprintln!("Missed {} notifications", n); + } + } } Ok(()) @@ -247,7 +258,7 @@ async fn main() -> Result<(), Box> { ### Value Capture -```rust,no_run +```rust use sqlx::SqlitePool; use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue}; @@ -284,7 +295,37 @@ async fn main() -> Result<(), Box> { ### SQLx SQLite Connection Manager Integration - +```rust +use std::sync::Arc; +use sqlx_sqlite_conn_mgr::SqliteDatabase; +use sqlx_sqlite_observer::{ + ObservableSqliteDatabase, ObserverConfig, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let db = SqliteDatabase::connect("mydb.db", None).await?; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(db, config); + + let mut rx = observable.subscribe(["users"]); + + // Write through the observable writer + let mut writer = observable.acquire_writer().await?; + sqlx::query("BEGIN").execute(&mut *writer).await?; + sqlx::query("INSERT INTO users (name) VALUES (?)") + .bind("Alice") + .execute(&mut *writer) + .await?; + sqlx::query("COMMIT").execute(&mut *writer).await?; + + // Notification arrives after commit + let change = rx.recv().await?; + println!("Changed: {}", change.table); + + Ok(()) +} +``` ## Usage Notes @@ -301,6 +342,58 @@ let config = ObserverConfig::new() .with_channel_capacity(1000); // Handle large transactions ``` +### Handling Lag + +When using the Stream API, the stream yields `TableChangeEvent` values. +Most events are `Change` variants, but if a consumer falls behind, the +stream yields a `Lagged(n)` event indicating how many notifications +were missed. + +```rust +use futures::StreamExt; +use sqlx_sqlite_observer::TableChangeEvent; +# use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig}; +# async fn example(observer: SqliteObserver) { + +let mut stream = observer.subscribe_stream(["users"]); + +while let Some(event) = stream.next().await { + match event { + TableChangeEvent::Change(change) => { + // Process the change normally + } + TableChangeEvent::Lagged(n) => { + // n notifications were missed — local state may be stale. + // Re-query the database for current state. + tracing::warn!("Missed {} change notifications", n); + } + } +} +# } +``` + +**When does lag happen?** The broadcast channel has a fixed capacity +(default 256). Lag occurs when the oldest unread messages are +overwritten. This can happen in two ways: + + * A subscriber processes changes slower than they arrive + * A single transaction contains more mutating statements than the + channel capacity, causing messages to be overwritten before the + consumer reads them + +This is rare under normal conditions but can occur during bulk +writes or large transactions. + +**How to prevent it:** + + * Increase `channel_capacity` via `ObserverConfig::with_channel_capacity` + * Process changes faster (avoid blocking in the stream consumer) + * Use a dedicated task for stream consumption + +**Note:** The `broadcast::Receiver` API (from `subscribe()`) surfaces +lag as `RecvError::Lagged(n)` — the same information, just through +the raw tokio broadcast channel interface rather than the stream. + ### Disabling Value Capture By default, `TableChange` includes `old_values` and `new_values` with the actual diff --git a/crates/sqlx-sqlite-observer/src/change.rs b/crates/sqlx-sqlite-observer/src/change.rs index 3887c6b..03020ec 100644 --- a/crates/sqlx-sqlite-observer/src/change.rs +++ b/crates/sqlx-sqlite-observer/src/change.rs @@ -100,6 +100,30 @@ impl ColumnValue { } } +/// Event yielded by [`TableChangeStream`](crate::stream::TableChangeStream). +/// +/// Most events are `Change` variants containing the actual table change data. +/// A `Lagged` event indicates the consumer fell behind and missed some +/// notifications — consider increasing +/// [`channel_capacity`](crate::config::ObserverConfig::channel_capacity). +#[derive(Debug, Clone)] +pub enum TableChangeEvent { + /// A table change notification. + Change(TableChange), + /// The stream fell behind and missed `n` change notifications. + /// + /// This can happen when: + /// - The consumer is processing changes too slowly relative to the + /// rate of database writes. + /// - A single transaction contains more mutating statements than the + /// channel capacity, causing older messages to be overwritten before + /// the consumer reads them. + /// + /// When this happens, the consumer should assume its local state may + /// be stale and re-query the database for the current state. + Lagged(u64), +} + /// Notification of a change to a database table. /// /// Contains the table name, operation type, affected rowid, and the diff --git a/crates/sqlx-sqlite-observer/src/conn_mgr.rs b/crates/sqlx-sqlite-observer/src/conn_mgr.rs new file mode 100644 index 0000000..c140608 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/conn_mgr.rs @@ -0,0 +1,335 @@ +//! Integration with sqlx-sqlite-conn-mgr crate. +//! +//! This module provides observation capabilities for databases managed by +//! `sqlx-sqlite-conn-mgr`. Enable with the `conn-mgr` feature. +//! +//! Uses SQLite's native hooks for transaction-safe change tracking. Changes +//! are buffered during transactions and only published after commit. +//! +//! # Example +//! +//! ```no_run +//! use std::sync::Arc; +//! use sqlx_sqlite_conn_mgr::SqliteDatabase; +//! use sqlx_sqlite_observer::{ObservableSqliteDatabase, ObserverConfig}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let db = SqliteDatabase::connect("mydb.db", None).await?; +//! let config = ObserverConfig::new().with_tables(["users", "posts"]); +//! let observable = ObservableSqliteDatabase::new(db, config); +//! +//! let mut rx = observable.subscribe(["users"]); +//! +//! // Use observable writer for tracked changes +//! let mut writer = observable.acquire_writer().await?; +//! sqlx::query("BEGIN").execute(&mut *writer).await?; +//! sqlx::query("INSERT INTO users (name) VALUES (?)") +//! .bind("Alice") +//! .execute(&mut *writer) +//! .await?; +//! +//! sqlx::query("COMMIT").execute(&mut *writer).await?; +//! // Changes publish on commit! +//! +//! // Read pool works as normal (no observation needed for reads) +//! let rows = sqlx::query("SELECT * FROM users") +//! .fetch_all(observable.read_pool()?) +//! .await?; +//! +//! Ok(()) +//! } +//! ``` + +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +use libsqlite3_sys::sqlite3; +use sqlx::sqlite::SqliteConnection; +use sqlx::{Pool, Sqlite}; +use sqlx_sqlite_conn_mgr::{SqliteDatabase, WriteGuard}; +use tokio::sync::broadcast; +use tracing::{debug, trace, warn}; + +use crate::Result; +use crate::broker::ObservationBroker; +use crate::change::TableChange; +use crate::config::ObserverConfig; +use crate::hooks; +use crate::schema::query_table_info; +use crate::stream::TableChangeStream; + +/// Wrapper around `SqliteDatabase` that provides change observation. +/// +/// This type integrates with `sqlx-sqlite-conn-mgr` to observe changes made +/// through the write connection while leaving read operations unaffected. +/// Uses SQLite's native hooks for transaction-safe notifications. +pub struct ObservableSqliteDatabase { + db: Arc, + broker: Arc, +} + +impl ObservableSqliteDatabase { + /// Create a new observable database wrapper. + /// + /// # Arguments + /// + /// * `db` - The `SqliteDatabase` instance to observe + /// * `config` - Observer configuration specifying which tables to track + pub fn new(db: Arc, config: ObserverConfig) -> Self { + let broker = ObservationBroker::new(config.channel_capacity, config.capture_values); + + if !config.tables.is_empty() { + broker.observe_tables(config.tables.iter().map(String::as_str)); + } + + Self { db, broker } + } + + /// Subscribe to change notifications. + /// + /// Returns a broadcast receiver that will receive `TableChange` events + /// when observable tables are modified and transactions commit. + pub fn subscribe(&self, tables: I) -> broadcast::Receiver + where + I: IntoIterator, + S: Into, + { + let tables: Vec = tables.into_iter().map(Into::into).collect(); + if !tables.is_empty() { + self + .broker + .observe_tables(tables.iter().map(String::as_str)); + } + self.broker.subscribe() + } + + /// Subscribe and get a `Stream` for easier async iteration. + pub fn subscribe_stream(&self, tables: I) -> TableChangeStream + where + I: IntoIterator, + S: Into, + { + use crate::stream::TableChangeStreamExt; + let tables: Vec = tables.into_iter().map(Into::into).collect(); + // Register tables for observation (uses references, avoids clone) + if !tables.is_empty() { + self + .broker + .observe_tables(tables.iter().map(String::as_str)); + } + let rx = self.broker.subscribe(); + let stream = rx.into_stream(); + if tables.is_empty() { + stream + } else { + stream.filter_tables(tables) + } + } + + /// Get a reference to the read-only connection pool. + /// + /// Read operations don't need observation since they don't modify data. + /// However, this pool is also used internally to query table schema + /// information (primary key columns, WITHOUT ROWID status) when tables + /// are first observed. + pub fn read_pool(&self) -> sqlx_sqlite_conn_mgr::Result<&Pool> { + self.db.read_pool() + } + + /// Acquire an observable write guard. + /// + /// The returned `ObservableWriteGuard` has observation hooks registered. + /// Changes are published to subscribers when transactions commit. + /// + /// On first acquisition for each table, queries the schema to determine + /// primary key columns and WITHOUT ROWID status. + pub async fn acquire_writer(&self) -> Result { + let writer = self + .db + .acquire_writer() + .await + .map_err(crate::error::Error::ConnMgr)?; + + let mut observable = ObservableWriteGuard { + writer: Some(writer), + hooks_registered: false, + raw_db: None, + }; + + // Query table info for any observed tables that don't have it yet + self.ensure_table_info().await?; + + observable.register_hooks(Arc::clone(&self.broker)).await?; + Ok(observable) + } + + /// Ensures TableInfo is set for all observed tables. + /// + /// Uses the read pool to query schema information, respecting conn-mgr's + /// requirement that all connections be acquired through it. + async fn ensure_table_info(&self) -> Result<()> { + let observed = self.broker.get_observed_tables(); + + // Collect tables that need schema info + let tables_to_query: Vec = observed + .into_iter() + .filter(|table| self.broker.get_table_info(table).is_none()) + .collect(); + + if tables_to_query.is_empty() { + return Ok(()); + } + + // Use read pool to query schema + let pool = self.db.read_pool().map_err(crate::error::Error::ConnMgr)?; + let mut conn = pool.acquire().await.map_err(crate::error::Error::Sqlx)?; + + for table in tables_to_query { + match query_table_info(&mut conn, &table).await { + Ok(Some(info)) => { + debug!(table = %table, pk_columns = ?info.pk_columns, without_rowid = info.without_rowid, "Queried table info"); + self.broker.set_table_info(&table, info); + } + Ok(None) => { + warn!(table = %table, "Table not found in schema"); + } + Err(e) => { + warn!(table = %table, error = %e, "Failed to query table info"); + } + } + } + + Ok(()) + } + + /// Get the underlying `SqliteDatabase`. + pub fn inner(&self) -> &Arc { + &self.db + } + + /// Get the list of currently observed tables. + pub fn observed_tables(&self) -> Vec { + self.broker.get_observed_tables() + } + + /// Returns a reference to the underlying observation broker. + pub fn broker(&self) -> &Arc { + &self.broker + } +} + +impl Clone for ObservableSqliteDatabase { + fn clone(&self) -> Self { + Self { + db: Arc::clone(&self.db), + broker: Arc::clone(&self.broker), + } + } +} + +/// RAII guard for observable write access to the database. +/// +/// This guard wraps a `WriteGuard` from `sqlx-sqlite-conn-mgr` and adds +/// change tracking via SQLite hooks. Changes are published to subscribers +/// when transactions commit. +#[must_use = "if unused, the write lock is immediately released"] +pub struct ObservableWriteGuard { + writer: Option, + hooks_registered: bool, + /// Raw sqlite3 pointer, cached during register_hooks so we can + /// call unregister_hooks synchronously in Drop without needing + /// the async lock_handle. + raw_db: Option<*mut sqlite3>, +} + +// SAFETY: The raw_db pointer is only used for hook registration/unregistration +// and is always accessed from the same logical owner. The underlying sqlite3 +// connection is already Send via sqlx's PoolConnection. +unsafe impl Send for ObservableWriteGuard {} + +impl ObservableWriteGuard { + fn writer_mut(&mut self) -> &mut WriteGuard { + self.writer.as_mut().expect("writer already taken") + } + + /// Registers SQLite observation hooks on this writer. + async fn register_hooks(&mut self, broker: Arc) -> Result<()> { + if self.hooks_registered { + return Ok(()); + } + + debug!("Registering SQLite observation hooks on WriteGuard"); + + let writer = self.writer.as_mut().expect("writer already taken"); + + // Get raw SQLite handle + let mut handle = writer + .lock_handle() + .await + .map_err(|e| crate::Error::Database(format!("Failed to lock connection handle: {}", e)))?; + + let db: *mut sqlite3 = handle.as_raw_handle().as_ptr(); + + unsafe { + hooks::register_hooks(db, broker)?; + } + + // Cache the raw pointer so Drop can call unregister_hooks synchronously. + // SAFETY: The pointer remains valid for the lifetime of the WriteGuard, + // which we own via self.writer. + self.raw_db = Some(db); + self.hooks_registered = true; + Ok(()) + } + + /// Consumes this wrapper and returns the underlying write guard. + /// + /// Hooks are unregistered before returning the guard, so it can be + /// safely used without observation. + pub fn into_inner(mut self) -> WriteGuard { + // Unregister hooks before returning the writer to prevent + // use-after-free if the broker is dropped before the connection is reused. + if self.hooks_registered + && let Some(db) = self.raw_db + { + unsafe { + crate::hooks::unregister_hooks(db); + } + trace!("Hooks unregistered before returning inner WriteGuard"); + } + self.hooks_registered = false; + self.raw_db = None; + self.writer.take().expect("writer already taken") + } +} + +impl Drop for ObservableWriteGuard { + fn drop(&mut self) { + if self.hooks_registered + && let Some(db) = self.raw_db + { + // SAFETY: db was obtained from lock_handle during register_hooks and + // remains valid because we still own the WriteGuard (self.writer). + // The writer has not been taken (into_inner clears hooks_registered). + unsafe { + hooks::unregister_hooks(db); + } + trace!("ObservableWriteGuard dropped, hooks unregistered"); + } + } +} + +impl Deref for ObservableWriteGuard { + type Target = SqliteConnection; + + fn deref(&self) -> &Self::Target { + self.writer.as_ref().expect("writer already taken") + } +} + +impl DerefMut for ObservableWriteGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + self.writer_mut() + } +} diff --git a/crates/sqlx-sqlite-observer/src/error.rs b/crates/sqlx-sqlite-observer/src/error.rs index 6d4baee..e06cb5e 100644 --- a/crates/sqlx-sqlite-observer/src/error.rs +++ b/crates/sqlx-sqlite-observer/src/error.rs @@ -15,6 +15,11 @@ pub enum Error { #[error("Failed to acquire connection from pool")] PoolAcquire, + /// Connection manager error. + #[cfg(feature = "conn-mgr")] + #[error("Connection manager error: {0}")] + ConnMgr(#[from] sqlx_sqlite_conn_mgr::Error), + /// Database error (non-sqlx). #[error("Database error: {0}")] Database(String), diff --git a/crates/sqlx-sqlite-observer/src/lib.rs b/crates/sqlx-sqlite-observer/src/lib.rs index cd881e4..c21bf30 100644 --- a/crates/sqlx-sqlite-observer/src/lib.rs +++ b/crates/sqlx-sqlite-observer/src/lib.rs @@ -81,7 +81,9 @@ //! ```rust,no_run //! use futures::StreamExt; //! use sqlx::SqlitePool; -//! use sqlx_sqlite_observer::{ChangeOperation, SqliteObserver, ObserverConfig, TableChangeStreamExt}; +//! use sqlx_sqlite_observer::{ +//! SqliteObserver, ObserverConfig, TableChangeEvent, TableChangeStreamExt, +//! }; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { @@ -92,20 +94,26 @@ //! // Get a Stream instead of broadcast::Receiver //! let mut stream = observer.subscribe_stream(["users"]); //! -//! // Use standard Stream combinators -//! while let Some(change) = stream.next().await { -//! println!( -//! "Table {} row {} was {:?}", -//! change.table, -//! change.rowid.unwrap_or(-1), -//! change.operation -//! ); -//! // Access typed column values -//! if let Some(old) = &change.old_values { -//! println!(" Old values: {:?}", old); -//! } -//! if let Some(new) = &change.new_values { -//! println!(" New values: {:?}", new); +//! // Stream yields TableChangeEvent variants +//! while let Some(event) = stream.next().await { +//! match event { +//! TableChangeEvent::Change(change) => { +//! println!( +//! "Table {} row {} was {:?}", +//! change.table, +//! change.rowid.unwrap_or(-1), +//! change.operation +//! ); +//! if let Some(old) = &change.old_values { +//! println!(" Old values: {:?}", old); +//! } +//! if let Some(new) = &change.new_values { +//! println!(" New values: {:?}", new); +//! } +//! } +//! TableChangeEvent::Lagged(n) => { +//! eprintln!("Missed {} notifications, re-query state", n); +//! } //! } //! } //! @@ -123,8 +131,11 @@ pub mod observer; pub mod schema; pub mod stream; +#[cfg(feature = "conn-mgr")] +pub mod conn_mgr; + pub use broker::ObservationBroker; -pub use change::{ChangeOperation, ColumnValue, TableChange, TableInfo}; +pub use change::{ChangeOperation, ColumnValue, TableChange, TableChangeEvent, TableInfo}; pub use config::ObserverConfig; pub use connection::ObservableConnection; pub use error::Error; @@ -132,4 +143,7 @@ pub use hooks::{SqliteValue, is_preupdate_hook_enabled, unregister_hooks}; pub use observer::SqliteObserver; pub use stream::{TableChangeStream, TableChangeStreamExt}; +#[cfg(feature = "conn-mgr")] +pub use conn_mgr::{ObservableSqliteDatabase, ObservableWriteGuard}; + pub type Result = std::result::Result; diff --git a/crates/sqlx-sqlite-observer/src/stream.rs b/crates/sqlx-sqlite-observer/src/stream.rs index 3e33525..f488048 100644 --- a/crates/sqlx-sqlite-observer/src/stream.rs +++ b/crates/sqlx-sqlite-observer/src/stream.rs @@ -6,7 +6,7 @@ use tokio_stream::Stream; use tokio_stream::wrappers::BroadcastStream; use tracing::warn; -use crate::change::TableChange; +use crate::change::{TableChange, TableChangeEvent}; /// A filtered stream of table change notifications. /// @@ -32,7 +32,7 @@ impl TableChangeStream { } impl Stream for TableChangeStream { - type Item = TableChange; + type Item = TableChangeEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -46,15 +46,17 @@ impl Stream for TableChangeStream { { continue; } - return Poll::Ready(Some(change)); + return Poll::Ready(Some(TableChangeEvent::Change(change))); } - Poll::Ready(Some(Err(err))) => { - // Lagged error - missed some messages due to slow consumption + Poll::Ready(Some(Err( + tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(count), + ))) => { warn!( - error = %err, - "Stream lagged — missed change notifications. Consider increasing channel_capacity." + missed = count, + "Stream lagged — missed change notifications. \ + Consider increasing channel_capacity." ); - continue; + return Poll::Ready(Some(TableChangeEvent::Lagged(count))); } Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => return Poll::Pending, diff --git a/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs b/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs new file mode 100644 index 0000000..0951376 --- /dev/null +++ b/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs @@ -0,0 +1,343 @@ +//! Integration tests for conn-mgr feature (sqlx-sqlite-conn-mgr integration). +//! +//! Tests verify the same behaviors as integration_tests.rs but using +//! `ObservableSqliteDatabase` instead of `SqliteObserver`. +//! +//! Run with: cargo test --features conn-mgr + +#![cfg(feature = "conn-mgr")] + +use futures::StreamExt; +use sqlx_sqlite_conn_mgr::SqliteDatabase; +use sqlx_sqlite_observer::{ChangeOperation, ObservableSqliteDatabase, ObserverConfig}; +use std::time::Duration; +use tokio::time::timeout; + +struct TestDb { + db: std::sync::Arc, + _temp_file: tempfile::NamedTempFile, +} + +async fn setup_test_db() -> TestDb { + // Use temp file so read pool and writer share the same database + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let db = SqliteDatabase::connect(temp_file.path().to_str().unwrap(), None) + .await + .unwrap(); + + // Create test tables using writer + let mut writer = db.acquire_writer().await.unwrap(); + sqlx::query( + r#" + CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL + ) + "#, + ) + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query( + r#" + CREATE TABLE posts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + title TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) + ) + "#, + ) + .execute(&mut *writer) + .await + .unwrap(); + + drop(writer); + + TestDb { + db, + _temp_file: temp_file, + } +} + +// ============================================================================ +// Observable Lifecycle +// ============================================================================ + +#[tokio::test] +async fn test_observable_starts_with_configured_tables() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db, config); + + assert_eq!(observable.observed_tables().len(), 1); + assert!(observable.observed_tables().contains(&"users".to_string())); +} + +// ============================================================================ +// Transaction Semantics +// ============================================================================ + +#[tokio::test] +async fn test_commit_publishes_notification() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut rx = observable.subscribe(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let result = timeout(Duration::from_millis(100), rx.recv()).await; + assert!(result.is_ok(), "Should receive notification after commit"); + + let change = result.unwrap().unwrap(); + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Insert)); +} + +#[tokio::test] +async fn test_uncommitted_changes_not_published() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut rx = observable.subscribe(["users"]); + + { + let mut writer = observable.acquire_writer().await.unwrap(); + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Bob')") + .execute(&mut *writer) + .await + .unwrap(); + // No COMMIT - implicit rollback on drop + } + + tokio::time::sleep(Duration::from_millis(50)).await; + + let result = timeout(Duration::from_millis(50), rx.recv()).await; + assert!(result.is_err(), "Should NOT notify for uncommitted changes"); +} + +#[tokio::test] +async fn test_rollback_discards_changes() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut rx = observable.subscribe(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Charlie')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("ROLLBACK").execute(&mut *writer).await.unwrap(); + + tokio::time::sleep(Duration::from_millis(50)).await; + + let result = timeout(Duration::from_millis(50), rx.recv()).await; + assert!(result.is_err(), "Should NOT notify for rolled-back changes"); +} + +// ============================================================================ +// CRUD Operations +// ============================================================================ + +#[tokio::test] +async fn test_update_notification() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + // Seed data + let mut writer = observable.acquire_writer().await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut *writer) + .await + .unwrap(); + + drop(writer); + + let mut rx = observable.subscribe(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("UPDATE users SET name = 'Bob' WHERE id = 1") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Update)); +} + +#[tokio::test] +async fn test_delete_notification() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + // Seed data + let mut writer = observable.acquire_writer().await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut *writer) + .await + .unwrap(); + + drop(writer); + + let mut rx = observable.subscribe(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("DELETE FROM users WHERE id = 1") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Delete)); +} + +// ============================================================================ +// Read Pool +// ============================================================================ + +#[tokio::test] +async fn test_read_pool_sees_committed_writes() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + // Insert via writer + let mut writer = observable.acquire_writer().await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Diana')") + .execute(&mut *writer) + .await + .unwrap(); + + drop(writer); + + // Read via read_pool + let rows: Vec<(i64, String)> = sqlx::query_as("SELECT id, name FROM users") + .fetch_all(observable.read_pool().unwrap()) + .await + .unwrap(); + + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].1, "Diana"); +} + +// ============================================================================ +// Multi-Subscriber & Clone +// ============================================================================ + +#[tokio::test] +async fn test_all_subscribers_receive_notification() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut rx1 = observable.subscribe(["users"]); + let mut rx2 = observable.subscribe(["users"]); + + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let result1 = timeout(Duration::from_millis(100), rx1.recv()).await; + let result2 = timeout(Duration::from_millis(100), rx2.recv()).await; + + assert!(result1.is_ok(), "Subscriber 1 receives notification"); + assert!(result2.is_ok(), "Subscriber 2 receives notification"); +} + +#[tokio::test] +async fn test_cloned_observable_shares_state() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable1 = ObservableSqliteDatabase::new(test_db.db.clone(), config); + let observable2 = observable1.clone(); + + // Subscribe on original, write through clone + let mut rx = observable1.subscribe(["users"]); + let mut writer = observable2.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Frank')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let result = timeout(Duration::from_millis(100), rx.recv()).await; + assert!(result.is_ok(), "Receives notification through clone"); +} + +// ============================================================================ +// Stream API +// ============================================================================ + +#[tokio::test] +async fn test_stream_receives_notifications() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut stream = observable.subscribe_stream(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Eve')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let result = timeout(Duration::from_millis(100), stream.next()).await; + assert!(result.is_ok(), "Stream receives notification"); + + let event = result.unwrap().unwrap(); + match event { + sqlx_sqlite_observer::TableChangeEvent::Change(change) => { + assert_eq!(change.table, "users"); + } + sqlx_sqlite_observer::TableChangeEvent::Lagged(_) => { + panic!("Expected Change event, got Lagged"); + } + } +} diff --git a/crates/sqlx-sqlite-observer/tests/integration_tests.rs b/crates/sqlx-sqlite-observer/tests/integration_tests.rs index f993be8..5a7c267 100644 --- a/crates/sqlx-sqlite-observer/tests/integration_tests.rs +++ b/crates/sqlx-sqlite-observer/tests/integration_tests.rs @@ -391,8 +391,15 @@ async fn test_stream_receives_notifications() { let result = timeout(Duration::from_millis(100), stream.next()).await; assert!(result.is_ok(), "Stream receives notification"); - let change = result.unwrap().unwrap(); - assert_eq!(change.table, "users"); + let event = result.unwrap().unwrap(); + match event { + sqlx_sqlite_observer::TableChangeEvent::Change(change) => { + assert_eq!(change.table, "users"); + } + sqlx_sqlite_observer::TableChangeEvent::Lagged(_) => { + panic!("Expected Change event, got Lagged"); + } + } } #[tokio::test] @@ -424,6 +431,50 @@ async fn test_stream_filters_tables() { assert!(result.is_err(), "Stream filters out non-subscribed tables"); } +#[tokio::test] +async fn test_stream_lag_when_capacity_exceeded() { + let pool = setup_test_db().await; + let config = ObserverConfig::new() + .with_tables(["users"]) + .with_channel_capacity(2); + + let observer = SqliteObserver::new(pool, config); + + let mut stream = observer.subscribe_stream(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + // Insert more rows than the channel capacity in a single transaction + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + for i in 0..5 { + sqlx::query("INSERT INTO users (name) VALUES (?)") + .bind(format!("User{}", i)) + .execute(&mut **conn) + .await + .unwrap(); + } + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let mut saw_lagged = false; + let mut saw_change = false; + + // Drain all available events + while let Ok(Some(event)) = timeout(Duration::from_millis(100), stream.next()).await { + match event { + sqlx_sqlite_observer::TableChangeEvent::Lagged(n) => { + assert!(n > 0, "Lagged count should be > 0"); + saw_lagged = true; + } + sqlx_sqlite_observer::TableChangeEvent::Change(change) => { + assert_eq!(change.table, "users"); + saw_change = true; + } + } + } + + assert!(saw_lagged, "Expected at least one Lagged event"); + assert!(saw_change, "Expected at least one Change event"); +} + // ============================================================================ // Value Capture // ============================================================================