From d4069f311511e9aa83a60822701792aa4a4bf264 Mon Sep 17 00:00:00 2001 From: Paul Morris <10599524+pmorris-dev@users.noreply.github.com> Date: Fri, 6 Feb 2026 20:14:52 -0500 Subject: [PATCH 1/2] feat: add integration with conn-mgr crate - we wrapped SqliteDatabase and WriteGuard and, otherwise implemented seamless interop between observer and conn-mgr --- Cargo.lock | 16 +- crates/sqlx-sqlite-observer/Cargo.toml | 2 + crates/sqlx-sqlite-observer/src/conn_mgr.rs | 335 +++++++++++++++++ crates/sqlx-sqlite-observer/src/error.rs | 5 + crates/sqlx-sqlite-observer/src/lib.rs | 6 + .../tests/conn_mgr_tests.rs | 336 ++++++++++++++++++ 6 files changed, 699 insertions(+), 1 deletion(-) create mode 100644 crates/sqlx-sqlite-observer/src/conn_mgr.rs create mode 100644 crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 5b71b00..16bae15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3741,6 +3741,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "sqlx-sqlite-conn-mgr" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d1ee0837ee4c49036a64632adf814c1799ec54cc0ba31712d263ce12c09ad45" +dependencies = [ + "serde", + "sqlx", + "thiserror 2.0.17", + "tokio", + "tracing", +] + [[package]] name = "sqlx-sqlite-observer" version = "0.8.6" @@ -3750,6 +3763,7 @@ dependencies = [ "parking_lot", "regex", "sqlx", + "sqlx-sqlite-conn-mgr 0.8.6 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile", "thiserror 2.0.17", "tokio", @@ -4076,7 +4090,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "sqlx-sqlite-conn-mgr", + "sqlx-sqlite-conn-mgr 0.8.6", "tauri", "tauri-plugin", "tempfile", diff --git a/crates/sqlx-sqlite-observer/Cargo.toml b/crates/sqlx-sqlite-observer/Cargo.toml index afdae19..b0b30d0 100644 --- a/crates/sqlx-sqlite-observer/Cargo.toml +++ b/crates/sqlx-sqlite-observer/Cargo.toml @@ -13,6 +13,7 @@ keywords = ["sqlite", "sqlx", "reactive", "observer", "database"] categories = ["database", "asynchronous"] [features] +conn-mgr = ["dep:sqlx-sqlite-conn-mgr"] # Bundle SQLite by default - preupdate hooks require SQLITE_ENABLE_PREUPDATE_HOOK # which most system SQLite libraries don't have enabled. default = ["bundled"] @@ -28,6 +29,7 @@ regex = "1.12.3" sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio"], default-features = false } # Required for preupdate_hook - SQLite must be compiled with SQLITE_ENABLE_PREUPDATE_HOOK libsqlite3-sys = { version = "0.30.1", features = ["preupdate_hook"] } +sqlx-sqlite-conn-mgr = { version = "0.8.6", optional = true } [dev-dependencies] tokio = { version = "1.49.0", features = ["full", "macros"] } diff --git a/crates/sqlx-sqlite-observer/src/conn_mgr.rs b/crates/sqlx-sqlite-observer/src/conn_mgr.rs new file mode 100644 index 0000000..c140608 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/conn_mgr.rs @@ -0,0 +1,335 @@ +//! Integration with sqlx-sqlite-conn-mgr crate. +//! +//! This module provides observation capabilities for databases managed by +//! `sqlx-sqlite-conn-mgr`. Enable with the `conn-mgr` feature. +//! +//! Uses SQLite's native hooks for transaction-safe change tracking. Changes +//! are buffered during transactions and only published after commit. +//! +//! # Example +//! +//! ```no_run +//! use std::sync::Arc; +//! use sqlx_sqlite_conn_mgr::SqliteDatabase; +//! use sqlx_sqlite_observer::{ObservableSqliteDatabase, ObserverConfig}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let db = SqliteDatabase::connect("mydb.db", None).await?; +//! let config = ObserverConfig::new().with_tables(["users", "posts"]); +//! let observable = ObservableSqliteDatabase::new(db, config); +//! +//! let mut rx = observable.subscribe(["users"]); +//! +//! // Use observable writer for tracked changes +//! let mut writer = observable.acquire_writer().await?; +//! sqlx::query("BEGIN").execute(&mut *writer).await?; +//! sqlx::query("INSERT INTO users (name) VALUES (?)") +//! .bind("Alice") +//! .execute(&mut *writer) +//! .await?; +//! +//! sqlx::query("COMMIT").execute(&mut *writer).await?; +//! // Changes publish on commit! +//! +//! // Read pool works as normal (no observation needed for reads) +//! let rows = sqlx::query("SELECT * FROM users") +//! .fetch_all(observable.read_pool()?) +//! .await?; +//! +//! Ok(()) +//! } +//! ``` + +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +use libsqlite3_sys::sqlite3; +use sqlx::sqlite::SqliteConnection; +use sqlx::{Pool, Sqlite}; +use sqlx_sqlite_conn_mgr::{SqliteDatabase, WriteGuard}; +use tokio::sync::broadcast; +use tracing::{debug, trace, warn}; + +use crate::Result; +use crate::broker::ObservationBroker; +use crate::change::TableChange; +use crate::config::ObserverConfig; +use crate::hooks; +use crate::schema::query_table_info; +use crate::stream::TableChangeStream; + +/// Wrapper around `SqliteDatabase` that provides change observation. +/// +/// This type integrates with `sqlx-sqlite-conn-mgr` to observe changes made +/// through the write connection while leaving read operations unaffected. +/// Uses SQLite's native hooks for transaction-safe notifications. +pub struct ObservableSqliteDatabase { + db: Arc, + broker: Arc, +} + +impl ObservableSqliteDatabase { + /// Create a new observable database wrapper. + /// + /// # Arguments + /// + /// * `db` - The `SqliteDatabase` instance to observe + /// * `config` - Observer configuration specifying which tables to track + pub fn new(db: Arc, config: ObserverConfig) -> Self { + let broker = ObservationBroker::new(config.channel_capacity, config.capture_values); + + if !config.tables.is_empty() { + broker.observe_tables(config.tables.iter().map(String::as_str)); + } + + Self { db, broker } + } + + /// Subscribe to change notifications. + /// + /// Returns a broadcast receiver that will receive `TableChange` events + /// when observable tables are modified and transactions commit. + pub fn subscribe(&self, tables: I) -> broadcast::Receiver + where + I: IntoIterator, + S: Into, + { + let tables: Vec = tables.into_iter().map(Into::into).collect(); + if !tables.is_empty() { + self + .broker + .observe_tables(tables.iter().map(String::as_str)); + } + self.broker.subscribe() + } + + /// Subscribe and get a `Stream` for easier async iteration. + pub fn subscribe_stream(&self, tables: I) -> TableChangeStream + where + I: IntoIterator, + S: Into, + { + use crate::stream::TableChangeStreamExt; + let tables: Vec = tables.into_iter().map(Into::into).collect(); + // Register tables for observation (uses references, avoids clone) + if !tables.is_empty() { + self + .broker + .observe_tables(tables.iter().map(String::as_str)); + } + let rx = self.broker.subscribe(); + let stream = rx.into_stream(); + if tables.is_empty() { + stream + } else { + stream.filter_tables(tables) + } + } + + /// Get a reference to the read-only connection pool. + /// + /// Read operations don't need observation since they don't modify data. + /// However, this pool is also used internally to query table schema + /// information (primary key columns, WITHOUT ROWID status) when tables + /// are first observed. + pub fn read_pool(&self) -> sqlx_sqlite_conn_mgr::Result<&Pool> { + self.db.read_pool() + } + + /// Acquire an observable write guard. + /// + /// The returned `ObservableWriteGuard` has observation hooks registered. + /// Changes are published to subscribers when transactions commit. + /// + /// On first acquisition for each table, queries the schema to determine + /// primary key columns and WITHOUT ROWID status. + pub async fn acquire_writer(&self) -> Result { + let writer = self + .db + .acquire_writer() + .await + .map_err(crate::error::Error::ConnMgr)?; + + let mut observable = ObservableWriteGuard { + writer: Some(writer), + hooks_registered: false, + raw_db: None, + }; + + // Query table info for any observed tables that don't have it yet + self.ensure_table_info().await?; + + observable.register_hooks(Arc::clone(&self.broker)).await?; + Ok(observable) + } + + /// Ensures TableInfo is set for all observed tables. + /// + /// Uses the read pool to query schema information, respecting conn-mgr's + /// requirement that all connections be acquired through it. + async fn ensure_table_info(&self) -> Result<()> { + let observed = self.broker.get_observed_tables(); + + // Collect tables that need schema info + let tables_to_query: Vec = observed + .into_iter() + .filter(|table| self.broker.get_table_info(table).is_none()) + .collect(); + + if tables_to_query.is_empty() { + return Ok(()); + } + + // Use read pool to query schema + let pool = self.db.read_pool().map_err(crate::error::Error::ConnMgr)?; + let mut conn = pool.acquire().await.map_err(crate::error::Error::Sqlx)?; + + for table in tables_to_query { + match query_table_info(&mut conn, &table).await { + Ok(Some(info)) => { + debug!(table = %table, pk_columns = ?info.pk_columns, without_rowid = info.without_rowid, "Queried table info"); + self.broker.set_table_info(&table, info); + } + Ok(None) => { + warn!(table = %table, "Table not found in schema"); + } + Err(e) => { + warn!(table = %table, error = %e, "Failed to query table info"); + } + } + } + + Ok(()) + } + + /// Get the underlying `SqliteDatabase`. + pub fn inner(&self) -> &Arc { + &self.db + } + + /// Get the list of currently observed tables. + pub fn observed_tables(&self) -> Vec { + self.broker.get_observed_tables() + } + + /// Returns a reference to the underlying observation broker. + pub fn broker(&self) -> &Arc { + &self.broker + } +} + +impl Clone for ObservableSqliteDatabase { + fn clone(&self) -> Self { + Self { + db: Arc::clone(&self.db), + broker: Arc::clone(&self.broker), + } + } +} + +/// RAII guard for observable write access to the database. +/// +/// This guard wraps a `WriteGuard` from `sqlx-sqlite-conn-mgr` and adds +/// change tracking via SQLite hooks. Changes are published to subscribers +/// when transactions commit. +#[must_use = "if unused, the write lock is immediately released"] +pub struct ObservableWriteGuard { + writer: Option, + hooks_registered: bool, + /// Raw sqlite3 pointer, cached during register_hooks so we can + /// call unregister_hooks synchronously in Drop without needing + /// the async lock_handle. + raw_db: Option<*mut sqlite3>, +} + +// SAFETY: The raw_db pointer is only used for hook registration/unregistration +// and is always accessed from the same logical owner. The underlying sqlite3 +// connection is already Send via sqlx's PoolConnection. +unsafe impl Send for ObservableWriteGuard {} + +impl ObservableWriteGuard { + fn writer_mut(&mut self) -> &mut WriteGuard { + self.writer.as_mut().expect("writer already taken") + } + + /// Registers SQLite observation hooks on this writer. + async fn register_hooks(&mut self, broker: Arc) -> Result<()> { + if self.hooks_registered { + return Ok(()); + } + + debug!("Registering SQLite observation hooks on WriteGuard"); + + let writer = self.writer.as_mut().expect("writer already taken"); + + // Get raw SQLite handle + let mut handle = writer + .lock_handle() + .await + .map_err(|e| crate::Error::Database(format!("Failed to lock connection handle: {}", e)))?; + + let db: *mut sqlite3 = handle.as_raw_handle().as_ptr(); + + unsafe { + hooks::register_hooks(db, broker)?; + } + + // Cache the raw pointer so Drop can call unregister_hooks synchronously. + // SAFETY: The pointer remains valid for the lifetime of the WriteGuard, + // which we own via self.writer. + self.raw_db = Some(db); + self.hooks_registered = true; + Ok(()) + } + + /// Consumes this wrapper and returns the underlying write guard. + /// + /// Hooks are unregistered before returning the guard, so it can be + /// safely used without observation. + pub fn into_inner(mut self) -> WriteGuard { + // Unregister hooks before returning the writer to prevent + // use-after-free if the broker is dropped before the connection is reused. + if self.hooks_registered + && let Some(db) = self.raw_db + { + unsafe { + crate::hooks::unregister_hooks(db); + } + trace!("Hooks unregistered before returning inner WriteGuard"); + } + self.hooks_registered = false; + self.raw_db = None; + self.writer.take().expect("writer already taken") + } +} + +impl Drop for ObservableWriteGuard { + fn drop(&mut self) { + if self.hooks_registered + && let Some(db) = self.raw_db + { + // SAFETY: db was obtained from lock_handle during register_hooks and + // remains valid because we still own the WriteGuard (self.writer). + // The writer has not been taken (into_inner clears hooks_registered). + unsafe { + hooks::unregister_hooks(db); + } + trace!("ObservableWriteGuard dropped, hooks unregistered"); + } + } +} + +impl Deref for ObservableWriteGuard { + type Target = SqliteConnection; + + fn deref(&self) -> &Self::Target { + self.writer.as_ref().expect("writer already taken") + } +} + +impl DerefMut for ObservableWriteGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + self.writer_mut() + } +} diff --git a/crates/sqlx-sqlite-observer/src/error.rs b/crates/sqlx-sqlite-observer/src/error.rs index 6d4baee..e06cb5e 100644 --- a/crates/sqlx-sqlite-observer/src/error.rs +++ b/crates/sqlx-sqlite-observer/src/error.rs @@ -15,6 +15,11 @@ pub enum Error { #[error("Failed to acquire connection from pool")] PoolAcquire, + /// Connection manager error. + #[cfg(feature = "conn-mgr")] + #[error("Connection manager error: {0}")] + ConnMgr(#[from] sqlx_sqlite_conn_mgr::Error), + /// Database error (non-sqlx). #[error("Database error: {0}")] Database(String), diff --git a/crates/sqlx-sqlite-observer/src/lib.rs b/crates/sqlx-sqlite-observer/src/lib.rs index cd881e4..ac1f16d 100644 --- a/crates/sqlx-sqlite-observer/src/lib.rs +++ b/crates/sqlx-sqlite-observer/src/lib.rs @@ -123,6 +123,9 @@ pub mod observer; pub mod schema; pub mod stream; +#[cfg(feature = "conn-mgr")] +pub mod conn_mgr; + pub use broker::ObservationBroker; pub use change::{ChangeOperation, ColumnValue, TableChange, TableInfo}; pub use config::ObserverConfig; @@ -132,4 +135,7 @@ pub use hooks::{SqliteValue, is_preupdate_hook_enabled, unregister_hooks}; pub use observer::SqliteObserver; pub use stream::{TableChangeStream, TableChangeStreamExt}; +#[cfg(feature = "conn-mgr")] +pub use conn_mgr::{ObservableSqliteDatabase, ObservableWriteGuard}; + pub type Result = std::result::Result; diff --git a/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs b/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs new file mode 100644 index 0000000..e431640 --- /dev/null +++ b/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs @@ -0,0 +1,336 @@ +//! Integration tests for conn-mgr feature (sqlx-sqlite-conn-mgr integration). +//! +//! Tests verify the same behaviors as integration_tests.rs but using +//! `ObservableSqliteDatabase` instead of `SqliteObserver`. +//! +//! Run with: cargo test --features conn-mgr + +#![cfg(feature = "conn-mgr")] + +use futures::StreamExt; +use sqlx_sqlite_conn_mgr::SqliteDatabase; +use sqlx_sqlite_observer::{ChangeOperation, ObservableSqliteDatabase, ObserverConfig}; +use std::time::Duration; +use tokio::time::timeout; + +struct TestDb { + db: std::sync::Arc, + _temp_file: tempfile::NamedTempFile, +} + +async fn setup_test_db() -> TestDb { + // Use temp file so read pool and writer share the same database + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let db = SqliteDatabase::connect(temp_file.path().to_str().unwrap(), None) + .await + .unwrap(); + + // Create test tables using writer + let mut writer = db.acquire_writer().await.unwrap(); + sqlx::query( + r#" + CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL + ) + "#, + ) + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query( + r#" + CREATE TABLE posts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + title TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) + ) + "#, + ) + .execute(&mut *writer) + .await + .unwrap(); + + drop(writer); + + TestDb { + db, + _temp_file: temp_file, + } +} + +// ============================================================================ +// Observable Lifecycle +// ============================================================================ + +#[tokio::test] +async fn test_observable_starts_with_configured_tables() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db, config); + + assert_eq!(observable.observed_tables().len(), 1); + assert!(observable.observed_tables().contains(&"users".to_string())); +} + +// ============================================================================ +// Transaction Semantics +// ============================================================================ + +#[tokio::test] +async fn test_commit_publishes_notification() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut rx = observable.subscribe(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let result = timeout(Duration::from_millis(100), rx.recv()).await; + assert!(result.is_ok(), "Should receive notification after commit"); + + let change = result.unwrap().unwrap(); + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Insert)); +} + +#[tokio::test] +async fn test_uncommitted_changes_not_published() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut rx = observable.subscribe(["users"]); + + { + let mut writer = observable.acquire_writer().await.unwrap(); + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Bob')") + .execute(&mut *writer) + .await + .unwrap(); + // No COMMIT - implicit rollback on drop + } + + tokio::time::sleep(Duration::from_millis(50)).await; + + let result = timeout(Duration::from_millis(50), rx.recv()).await; + assert!(result.is_err(), "Should NOT notify for uncommitted changes"); +} + +#[tokio::test] +async fn test_rollback_discards_changes() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut rx = observable.subscribe(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Charlie')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("ROLLBACK").execute(&mut *writer).await.unwrap(); + + tokio::time::sleep(Duration::from_millis(50)).await; + + let result = timeout(Duration::from_millis(50), rx.recv()).await; + assert!(result.is_err(), "Should NOT notify for rolled-back changes"); +} + +// ============================================================================ +// CRUD Operations +// ============================================================================ + +#[tokio::test] +async fn test_update_notification() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + // Seed data + let mut writer = observable.acquire_writer().await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut *writer) + .await + .unwrap(); + + drop(writer); + + let mut rx = observable.subscribe(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("UPDATE users SET name = 'Bob' WHERE id = 1") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Update)); +} + +#[tokio::test] +async fn test_delete_notification() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + // Seed data + let mut writer = observable.acquire_writer().await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut *writer) + .await + .unwrap(); + + drop(writer); + + let mut rx = observable.subscribe(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("DELETE FROM users WHERE id = 1") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let change = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(change.table, "users"); + assert_eq!(change.operation, Some(ChangeOperation::Delete)); +} + +// ============================================================================ +// Read Pool +// ============================================================================ + +#[tokio::test] +async fn test_read_pool_sees_committed_writes() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + // Insert via writer + let mut writer = observable.acquire_writer().await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Diana')") + .execute(&mut *writer) + .await + .unwrap(); + + drop(writer); + + // Read via read_pool + let rows: Vec<(i64, String)> = sqlx::query_as("SELECT id, name FROM users") + .fetch_all(observable.read_pool().unwrap()) + .await + .unwrap(); + + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].1, "Diana"); +} + +// ============================================================================ +// Multi-Subscriber & Clone +// ============================================================================ + +#[tokio::test] +async fn test_all_subscribers_receive_notification() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut rx1 = observable.subscribe(["users"]); + let mut rx2 = observable.subscribe(["users"]); + + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Alice')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let result1 = timeout(Duration::from_millis(100), rx1.recv()).await; + let result2 = timeout(Duration::from_millis(100), rx2.recv()).await; + + assert!(result1.is_ok(), "Subscriber 1 receives notification"); + assert!(result2.is_ok(), "Subscriber 2 receives notification"); +} + +#[tokio::test] +async fn test_cloned_observable_shares_state() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable1 = ObservableSqliteDatabase::new(test_db.db.clone(), config); + let observable2 = observable1.clone(); + + // Subscribe on original, write through clone + let mut rx = observable1.subscribe(["users"]); + let mut writer = observable2.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Frank')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let result = timeout(Duration::from_millis(100), rx.recv()).await; + assert!(result.is_ok(), "Receives notification through clone"); +} + +// ============================================================================ +// Stream API +// ============================================================================ + +#[tokio::test] +async fn test_stream_receives_notifications() { + let test_db = setup_test_db().await; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(test_db.db.clone(), config); + + let mut stream = observable.subscribe_stream(["users"]); + let mut writer = observable.acquire_writer().await.unwrap(); + + sqlx::query("BEGIN").execute(&mut *writer).await.unwrap(); + sqlx::query("INSERT INTO users (name) VALUES ('Eve')") + .execute(&mut *writer) + .await + .unwrap(); + + sqlx::query("COMMIT").execute(&mut *writer).await.unwrap(); + + let result = timeout(Duration::from_millis(100), stream.next()).await; + assert!(result.is_ok(), "Stream receives notification"); + + let change = result.unwrap().unwrap(); + assert_eq!(change.table, "users"); +} From a149af9f5ac6ae11f8429ebdf13174475f566baa Mon Sep 17 00:00:00 2001 From: Paul Morris <10599524+pmorris-dev@users.noreply.github.com> Date: Fri, 6 Feb 2026 20:40:06 -0500 Subject: [PATCH 2/2] sub(feat): allow user to handle stream lag - Previously we were just logging a warning when the number of changes in a tx exceeded the channel capacity. Now we report the lag in the change event to make it more obvious to the user and allow for the user to respond to this "error" condition. --- crates/sqlx-sqlite-observer/README.md | 119 ++++++++++++++++-- crates/sqlx-sqlite-observer/src/change.rs | 24 ++++ crates/sqlx-sqlite-observer/src/lib.rs | 40 +++--- crates/sqlx-sqlite-observer/src/stream.rs | 18 +-- .../tests/conn_mgr_tests.rs | 11 +- .../tests/integration_tests.rs | 55 +++++++- 6 files changed, 226 insertions(+), 41 deletions(-) diff --git a/crates/sqlx-sqlite-observer/README.md b/crates/sqlx-sqlite-observer/README.md index 11ddb8e..a751792 100644 --- a/crates/sqlx-sqlite-observer/README.md +++ b/crates/sqlx-sqlite-observer/README.md @@ -95,6 +95,8 @@ This ensures subscribers **only receive notifications for committed changes**. ### Core Types * **`TableChange`**: Notification of a change to a database table + * **`TableChangeEvent`**: Event yielded by `TableChangeStream` — + either `Change(TableChange)` or `Lagged(u64)` * **`ChangeOperation`**: Insert, Update, or Delete * **`ColumnValue`**: Typed column value (Null, Integer, Real, Text, Blob) * **`ObserverConfig`**: Configuration for table filtering and channel @@ -111,7 +113,7 @@ This ensures subscribers **only receive notifications for committed changes**. * **`TableChangeStreamExt`**: Extension trait for converting receivers to streams -### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`) +### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`) * **`ObservableSqliteDatabase`**: Wrapper for `SqliteDatabase` with observation * **`ObservableWriteGuard`**: Write guard with hooks registered @@ -179,7 +181,7 @@ meaningful/correct for non-integer or composite primary keys. ### Basic Usage -```rust,no_run +```rust use sqlx::SqlitePool; use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue}; @@ -219,10 +221,12 @@ async fn main() -> Result<(), Box> { ### Stream API -```rust,no_run +```rust use futures::StreamExt; use sqlx::SqlitePool; -use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig}; +use sqlx_sqlite_observer::{ + SqliteObserver, ObserverConfig, TableChangeEvent, +}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -232,13 +236,20 @@ async fn main() -> Result<(), Box> { let mut stream = observer.subscribe_stream(["users"]); - while let Some(change) = stream.next().await { - println!( - "Table {} row {} was {:?}", - change.table, - change.rowid.unwrap_or(-1), - change.operation - ); + while let Some(event) = stream.next().await { + match event { + TableChangeEvent::Change(change) => { + println!( + "Table {} row {} was {:?}", + change.table, + change.rowid.unwrap_or(-1), + change.operation + ); + } + TableChangeEvent::Lagged(n) => { + eprintln!("Missed {} notifications", n); + } + } } Ok(()) @@ -247,7 +258,7 @@ async fn main() -> Result<(), Box> { ### Value Capture -```rust,no_run +```rust use sqlx::SqlitePool; use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue}; @@ -284,7 +295,37 @@ async fn main() -> Result<(), Box> { ### SQLx SQLite Connection Manager Integration - +```rust +use std::sync::Arc; +use sqlx_sqlite_conn_mgr::SqliteDatabase; +use sqlx_sqlite_observer::{ + ObservableSqliteDatabase, ObserverConfig, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let db = SqliteDatabase::connect("mydb.db", None).await?; + let config = ObserverConfig::new().with_tables(["users"]); + let observable = ObservableSqliteDatabase::new(db, config); + + let mut rx = observable.subscribe(["users"]); + + // Write through the observable writer + let mut writer = observable.acquire_writer().await?; + sqlx::query("BEGIN").execute(&mut *writer).await?; + sqlx::query("INSERT INTO users (name) VALUES (?)") + .bind("Alice") + .execute(&mut *writer) + .await?; + sqlx::query("COMMIT").execute(&mut *writer).await?; + + // Notification arrives after commit + let change = rx.recv().await?; + println!("Changed: {}", change.table); + + Ok(()) +} +``` ## Usage Notes @@ -301,6 +342,58 @@ let config = ObserverConfig::new() .with_channel_capacity(1000); // Handle large transactions ``` +### Handling Lag + +When using the Stream API, the stream yields `TableChangeEvent` values. +Most events are `Change` variants, but if a consumer falls behind, the +stream yields a `Lagged(n)` event indicating how many notifications +were missed. + +```rust +use futures::StreamExt; +use sqlx_sqlite_observer::TableChangeEvent; +# use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig}; +# async fn example(observer: SqliteObserver) { + +let mut stream = observer.subscribe_stream(["users"]); + +while let Some(event) = stream.next().await { + match event { + TableChangeEvent::Change(change) => { + // Process the change normally + } + TableChangeEvent::Lagged(n) => { + // n notifications were missed — local state may be stale. + // Re-query the database for current state. + tracing::warn!("Missed {} change notifications", n); + } + } +} +# } +``` + +**When does lag happen?** The broadcast channel has a fixed capacity +(default 256). Lag occurs when the oldest unread messages are +overwritten. This can happen in two ways: + + * A subscriber processes changes slower than they arrive + * A single transaction contains more mutating statements than the + channel capacity, causing messages to be overwritten before the + consumer reads them + +This is rare under normal conditions but can occur during bulk +writes or large transactions. + +**How to prevent it:** + + * Increase `channel_capacity` via `ObserverConfig::with_channel_capacity` + * Process changes faster (avoid blocking in the stream consumer) + * Use a dedicated task for stream consumption + +**Note:** The `broadcast::Receiver` API (from `subscribe()`) surfaces +lag as `RecvError::Lagged(n)` — the same information, just through +the raw tokio broadcast channel interface rather than the stream. + ### Disabling Value Capture By default, `TableChange` includes `old_values` and `new_values` with the actual diff --git a/crates/sqlx-sqlite-observer/src/change.rs b/crates/sqlx-sqlite-observer/src/change.rs index 3887c6b..03020ec 100644 --- a/crates/sqlx-sqlite-observer/src/change.rs +++ b/crates/sqlx-sqlite-observer/src/change.rs @@ -100,6 +100,30 @@ impl ColumnValue { } } +/// Event yielded by [`TableChangeStream`](crate::stream::TableChangeStream). +/// +/// Most events are `Change` variants containing the actual table change data. +/// A `Lagged` event indicates the consumer fell behind and missed some +/// notifications — consider increasing +/// [`channel_capacity`](crate::config::ObserverConfig::channel_capacity). +#[derive(Debug, Clone)] +pub enum TableChangeEvent { + /// A table change notification. + Change(TableChange), + /// The stream fell behind and missed `n` change notifications. + /// + /// This can happen when: + /// - The consumer is processing changes too slowly relative to the + /// rate of database writes. + /// - A single transaction contains more mutating statements than the + /// channel capacity, causing older messages to be overwritten before + /// the consumer reads them. + /// + /// When this happens, the consumer should assume its local state may + /// be stale and re-query the database for the current state. + Lagged(u64), +} + /// Notification of a change to a database table. /// /// Contains the table name, operation type, affected rowid, and the diff --git a/crates/sqlx-sqlite-observer/src/lib.rs b/crates/sqlx-sqlite-observer/src/lib.rs index ac1f16d..c21bf30 100644 --- a/crates/sqlx-sqlite-observer/src/lib.rs +++ b/crates/sqlx-sqlite-observer/src/lib.rs @@ -81,7 +81,9 @@ //! ```rust,no_run //! use futures::StreamExt; //! use sqlx::SqlitePool; -//! use sqlx_sqlite_observer::{ChangeOperation, SqliteObserver, ObserverConfig, TableChangeStreamExt}; +//! use sqlx_sqlite_observer::{ +//! SqliteObserver, ObserverConfig, TableChangeEvent, TableChangeStreamExt, +//! }; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { @@ -92,20 +94,26 @@ //! // Get a Stream instead of broadcast::Receiver //! let mut stream = observer.subscribe_stream(["users"]); //! -//! // Use standard Stream combinators -//! while let Some(change) = stream.next().await { -//! println!( -//! "Table {} row {} was {:?}", -//! change.table, -//! change.rowid.unwrap_or(-1), -//! change.operation -//! ); -//! // Access typed column values -//! if let Some(old) = &change.old_values { -//! println!(" Old values: {:?}", old); -//! } -//! if let Some(new) = &change.new_values { -//! println!(" New values: {:?}", new); +//! // Stream yields TableChangeEvent variants +//! while let Some(event) = stream.next().await { +//! match event { +//! TableChangeEvent::Change(change) => { +//! println!( +//! "Table {} row {} was {:?}", +//! change.table, +//! change.rowid.unwrap_or(-1), +//! change.operation +//! ); +//! if let Some(old) = &change.old_values { +//! println!(" Old values: {:?}", old); +//! } +//! if let Some(new) = &change.new_values { +//! println!(" New values: {:?}", new); +//! } +//! } +//! TableChangeEvent::Lagged(n) => { +//! eprintln!("Missed {} notifications, re-query state", n); +//! } //! } //! } //! @@ -127,7 +135,7 @@ pub mod stream; pub mod conn_mgr; pub use broker::ObservationBroker; -pub use change::{ChangeOperation, ColumnValue, TableChange, TableInfo}; +pub use change::{ChangeOperation, ColumnValue, TableChange, TableChangeEvent, TableInfo}; pub use config::ObserverConfig; pub use connection::ObservableConnection; pub use error::Error; diff --git a/crates/sqlx-sqlite-observer/src/stream.rs b/crates/sqlx-sqlite-observer/src/stream.rs index 3e33525..f488048 100644 --- a/crates/sqlx-sqlite-observer/src/stream.rs +++ b/crates/sqlx-sqlite-observer/src/stream.rs @@ -6,7 +6,7 @@ use tokio_stream::Stream; use tokio_stream::wrappers::BroadcastStream; use tracing::warn; -use crate::change::TableChange; +use crate::change::{TableChange, TableChangeEvent}; /// A filtered stream of table change notifications. /// @@ -32,7 +32,7 @@ impl TableChangeStream { } impl Stream for TableChangeStream { - type Item = TableChange; + type Item = TableChangeEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -46,15 +46,17 @@ impl Stream for TableChangeStream { { continue; } - return Poll::Ready(Some(change)); + return Poll::Ready(Some(TableChangeEvent::Change(change))); } - Poll::Ready(Some(Err(err))) => { - // Lagged error - missed some messages due to slow consumption + Poll::Ready(Some(Err( + tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(count), + ))) => { warn!( - error = %err, - "Stream lagged — missed change notifications. Consider increasing channel_capacity." + missed = count, + "Stream lagged — missed change notifications. \ + Consider increasing channel_capacity." ); - continue; + return Poll::Ready(Some(TableChangeEvent::Lagged(count))); } Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => return Poll::Pending, diff --git a/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs b/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs index e431640..0951376 100644 --- a/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs +++ b/crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs @@ -331,6 +331,13 @@ async fn test_stream_receives_notifications() { let result = timeout(Duration::from_millis(100), stream.next()).await; assert!(result.is_ok(), "Stream receives notification"); - let change = result.unwrap().unwrap(); - assert_eq!(change.table, "users"); + let event = result.unwrap().unwrap(); + match event { + sqlx_sqlite_observer::TableChangeEvent::Change(change) => { + assert_eq!(change.table, "users"); + } + sqlx_sqlite_observer::TableChangeEvent::Lagged(_) => { + panic!("Expected Change event, got Lagged"); + } + } } diff --git a/crates/sqlx-sqlite-observer/tests/integration_tests.rs b/crates/sqlx-sqlite-observer/tests/integration_tests.rs index f993be8..5a7c267 100644 --- a/crates/sqlx-sqlite-observer/tests/integration_tests.rs +++ b/crates/sqlx-sqlite-observer/tests/integration_tests.rs @@ -391,8 +391,15 @@ async fn test_stream_receives_notifications() { let result = timeout(Duration::from_millis(100), stream.next()).await; assert!(result.is_ok(), "Stream receives notification"); - let change = result.unwrap().unwrap(); - assert_eq!(change.table, "users"); + let event = result.unwrap().unwrap(); + match event { + sqlx_sqlite_observer::TableChangeEvent::Change(change) => { + assert_eq!(change.table, "users"); + } + sqlx_sqlite_observer::TableChangeEvent::Lagged(_) => { + panic!("Expected Change event, got Lagged"); + } + } } #[tokio::test] @@ -424,6 +431,50 @@ async fn test_stream_filters_tables() { assert!(result.is_err(), "Stream filters out non-subscribed tables"); } +#[tokio::test] +async fn test_stream_lag_when_capacity_exceeded() { + let pool = setup_test_db().await; + let config = ObserverConfig::new() + .with_tables(["users"]) + .with_channel_capacity(2); + + let observer = SqliteObserver::new(pool, config); + + let mut stream = observer.subscribe_stream(["users"]); + let mut conn = observer.acquire().await.unwrap(); + + // Insert more rows than the channel capacity in a single transaction + sqlx::query("BEGIN").execute(&mut **conn).await.unwrap(); + for i in 0..5 { + sqlx::query("INSERT INTO users (name) VALUES (?)") + .bind(format!("User{}", i)) + .execute(&mut **conn) + .await + .unwrap(); + } + sqlx::query("COMMIT").execute(&mut **conn).await.unwrap(); + + let mut saw_lagged = false; + let mut saw_change = false; + + // Drain all available events + while let Ok(Some(event)) = timeout(Duration::from_millis(100), stream.next()).await { + match event { + sqlx_sqlite_observer::TableChangeEvent::Lagged(n) => { + assert!(n > 0, "Lagged count should be > 0"); + saw_lagged = true; + } + sqlx_sqlite_observer::TableChangeEvent::Change(change) => { + assert_eq!(change.table, "users"); + saw_change = true; + } + } + } + + assert!(saw_lagged, "Expected at least one Lagged event"); + assert!(saw_change, "Expected at least one Change event"); +} + // ============================================================================ // Value Capture // ============================================================================