diff --git a/Cargo.lock b/Cargo.lock index a8f8c45..7dc4804 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,26 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "bitflags 2.10.0", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.111", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -288,6 +308,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfb" version = "0.7.3" @@ -333,6 +362,17 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading 0.8.9", +] + [[package]] name = "combine" version = "4.6.7" @@ -1647,6 +1687,15 @@ dependencies = [ "serde", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" @@ -1762,6 +1811,12 @@ dependencies = [ "spin", ] +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libappindicator" version = "0.9.0" @@ -1782,7 +1837,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e9ec52138abedcc58dc17a7c6c0c00a2bdb4f3427c7f63fa97fd0d859155caf" dependencies = [ "gtk-sys", - "libloading", + "libloading 0.7.4", "once_cell", ] @@ -1802,6 +1857,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link 0.2.1", +] + [[package]] name = "libm" version = "0.2.15" @@ -1825,6 +1890,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ + "bindgen", "cc", "pkg-config", "vcpkg", @@ -1925,6 +1991,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -2009,6 +2081,16 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "num-bigint-dig" version = "0.8.6" @@ -2878,9 +2960,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.12.2" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" dependencies = [ "aho-corasick", "memchr", @@ -2960,6 +3042,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.1" @@ -3619,6 +3707,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "sqlx-sqlite-observer" +version = "0.8.6" +dependencies = [ + "libsqlite3-sys", + "parking_lot", + "regex", + "sqlx", + "thiserror 2.0.17", + "tokio", + "tracing", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index caabc3f..9f85c6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "3" members = [ "crates/sqlx-sqlite-conn-mgr", + "crates/sqlx-sqlite-observer", ] [package] diff --git a/crates/sqlx-sqlite-observer/Cargo.toml b/crates/sqlx-sqlite-observer/Cargo.toml new file mode 100644 index 0000000..c6c55e7 --- /dev/null +++ b/crates/sqlx-sqlite-observer/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "sqlx-sqlite-observer" +# Sync major.minor with major.minor of SQLx crate +version = "0.8.6" +license = "MIT" +edition = "2024" +rust-version = "1.89" +authors = ["Jeremy Thomerson"] +description = "Reactive change notifications for SQLite databases using sqlx" +repository = "https://github.com/silvermine/tauri-plugin-sqlite" +readme = "README.md" +keywords = ["sqlite", "sqlx", "reactive", "observer", "database"] +categories = ["database", "asynchronous"] + +[features] +# Bundle SQLite by default - preupdate hooks require SQLITE_ENABLE_PREUPDATE_HOOK +# which most system SQLite libraries don't have enabled. +default = ["bundled"] +bundled = ["libsqlite3-sys/bundled"] + +[dependencies] +tokio = { version = "1.49.0", features = ["sync"] } +thiserror = "2.0.17" +tracing = { version = "0.1.44", default-features = false, features = ["std", "release_max_level_off"] } +parking_lot = "0.12.3" +regex = "1.12.3" +sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio"], default-features = false } +# Required for preupdate_hook - SQLite must be compiled with SQLITE_ENABLE_PREUPDATE_HOOK +libsqlite3-sys = { version = "0.30.1", features = ["preupdate_hook"] } + +[dev-dependencies] +tokio = { version = "1.49.0", features = ["full", "macros"] } diff --git a/crates/sqlx-sqlite-observer/LICENSE b/crates/sqlx-sqlite-observer/LICENSE new file mode 100644 index 0000000..0ea13aa --- /dev/null +++ b/crates/sqlx-sqlite-observer/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Jeremy Thomerson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/crates/sqlx-sqlite-observer/README.md b/crates/sqlx-sqlite-observer/README.md new file mode 100644 index 0000000..2c13130 --- /dev/null +++ b/crates/sqlx-sqlite-observer/README.md @@ -0,0 +1,230 @@ +# SQLx SQLite Observer + +Reactive change notifications for SQLite databases using sqlx. + +This crate provides **transaction-safe** change notifications for SQLite databases +using SQLite's native hooks (`preupdate_hook`, `commit_hook`, `rollback_hook`). + +## Features + + * **Transaction-safe notifications**: Changes only notify after successful commit + * **Typed column values**: Access old/new values with native SQLite types + * **Stream support**: Use `tokio_stream::Stream` for async iteration + * **Multiple subscribers**: Broadcast channel supports multiple listeners + * **Optional SQLx SQLite Connection Manager integration**: Works with + `sqlx-sqlite-conn-mgr` for single-writer/multi-reader patterns + +## SQLite Requirements + +Requires SQLite compiled with `SQLITE_ENABLE_PREUPDATE_HOOK`. + +**Important:** Most system SQLite libraries do NOT have this option enabled by +default. You have two options: + +1. **Use the `bundled` feature** (recommended for most users): + + ```toml + sqlx-sqlite-observer = { version = "0.8", features = ["bundled"] } + ``` + + This compiles SQLite from source with preupdate hook support (~1MB binary size + increase). + +2. **Provide your own SQLite** with `SQLITE_ENABLE_PREUPDATE_HOOK` compiled in. + Use `is_preupdate_hook_enabled()` to verify at runtime. + +If preupdate hooks are not available, `SqliteObserver::acquire()` will return an +error with a descriptive message. + +## Installation + +Add to your `Cargo.toml`: + +```toml +[dependencies] +sqlx-sqlite-observer = "0.8" +``` + +For integration with `sqlx-sqlite-conn-mgr`: + +```toml +[dependencies] +sqlx-sqlite-observer = { version = "0.8", features = ["conn-mgr"] } +``` + +## How It Works + +The library uses SQLite's native hooks for transaction-safe change tracking: + +```text +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ preupdate_hook │────►│ broker.buffer │ │ subscribers │ +│ (captures data) │ │ (Vec) │ │ │ +└─────────────────┘ └────────┬────────┘ └─────────────────┘ + │ ▲ + ┌────────────┼────────────┐ │ + │ │ │ │ + ┌─────▼────┐ ┌────▼─────┐ │ │ + │ COMMIT │ │ ROLLBACK │ │ │ + └─────┬────┘ └────┬─────┘ │ │ + │ │ │ │ + ▼ ▼ │ │ + on_commit() on_rollback() │ │ + │ │ │ │ + │ buffer.clear() │ │ + │ (discard) │ │ + │ │ │ + └─────────────────────────┴──────────┘ + change_tx.send() + (publish) +``` + +1. When you acquire a connection, observation hooks are registered on the raw + SQLite handle +2. `preupdate_hook` captures changes (table, operation, old/new values) and + buffers them +3. `commit_hook` fires when a transaction commits, publishing buffered changes + to subscribers +4. `rollback_hook` fires when a transaction rolls back, discarding buffered + changes + +This ensures subscribers **only receive notifications for committed changes**. + +## API Reference + +### Core Types + + * **`TableChange`**: Notification of a change to a database table + * **`ChangeOperation`**: Insert, Update, or Delete + * **`ColumnValue`**: Typed column value (Null, Integer, Real, Text, Blob) + * **`ObserverConfig`**: Configuration for table filtering and channel + capacity + +### Observer Types + + * **`SqliteObserver`**: Main observer for `SqlitePool` connections + * **`ObservableConnection`**: Connection wrapper with hooks registered + +### Stream Types + + * **`TableChangeStream`**: Async stream of table changes + * **`TableChangeStreamExt`**: Extension trait for converting receivers to + streams + +### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`) + + * **`ObservableSqliteDatabase`**: Wrapper for `SqliteDatabase` with observation + * **`ObservableWriteGuard`**: Write guard with hooks registered + +### `TableInfo` + +Schema information for observed tables (used internally, also exported). + + * `pk_columns: Vec` - Column indices forming the primary key + * `without_rowid: bool` - Whether the table uses WITHOUT ROWID + +## Primary Key Extraction + +The `primary_key` field on `TableChange` always contains the actual primary key +value(s) for the affected row: + +```rust +let change = rx.recv().await?; + +// Single-column PK (e.g., INTEGER PRIMARY KEY) +if let Some(ColumnValue::Integer(id)) = change.primary_key.first() { + println!("Changed row id: {}", id); +} + +// Composite PK - values are in declaration order +for (i, pk_value) in change.primary_key.iter().enumerate() { + println!("PK column {}: {:?}", i, pk_value); +} +``` + +**Why `primary_key` instead of just `rowid`?** + +SQLite's internal `rowid` works well for tables with `INTEGER PRIMARY KEY`, but +has limitations: + + * **Text or UUID primary keys**: The `rowid` is an internal integer, not your + actual key + * **Composite primary keys**: The `rowid` doesn't represent your multi-column + key + * **WITHOUT ROWID tables**: The `rowid` from the preupdate hook is unreliable + +The `primary_key` field extracts the actual primary key values from the captured +column data, giving you meaningful identifiers regardless of table structure. + +### WITHOUT ROWID Tables + +For tables created with `WITHOUT ROWID`, the `rowid` field in `TableChange` will +be `None`: + +```rust +let change = rx.recv().await?; + +if change.rowid.is_none() { + // This is a WITHOUT ROWID table + // Use primary_key instead + println!("PK: {:?}", change.primary_key); +} +``` + +This is because SQLite's preupdate hook provides the first PRIMARY KEY column +(coerced to i64) as the "rowid" for WITHOUT ROWID tables, which may not be +meaningful/correct for non-integer or composite primary keys. + +## Examples + +> **Coming in Phase 2** - Full working examples will be added in a subsequent PR. + +### Basic Usage + + + +### Stream API + + + +### Value Capture + + + +### SQLx SQLite Connection Manager Integration + + + +## Usage Notes + +### Channel Capacity + +The `channel_capacity` in `ObserverConfig` determines how many changes can be +buffered. All changes in a transaction are delivered at once on commit. If your +transaction contains more mutating statements than this capacity, **messages +will be dropped**. + + + +```rust +let config = ObserverConfig::new() + .with_tables(["users", "posts"]) + .with_channel_capacity(1000); // Handle large transactions +``` + +### Disabling Value Capture + +By default, `TableChange` includes `old_values` and `new_values` with the actual +column data. Disable this for lower memory usage if you only need row IDs: + + + +```rust +let config = ObserverConfig::new() + .with_tables(["users"]) + .with_capture_values(false); // Only track table + rowid +``` + +## License + +MIT License - see [LICENSE](LICENSE) for details. diff --git a/crates/sqlx-sqlite-observer/src/broker.rs b/crates/sqlx-sqlite-observer/src/broker.rs new file mode 100644 index 0000000..81c1a85 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/broker.rs @@ -0,0 +1,263 @@ +//! Transaction-aware observation broker for buffering and publishing changes. +//! +//! This module provides transaction-safe change notifications. Changes are buffered +//! during transactions (explicit and implicit) and only published after successful +//! commit. Rolled-back transactions produce no notifications. +//! +//! # Data Flow +//! +//! ```text +//! ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +//! │ preupdate_hook │────►│ broker.buffer │ │ subscribers │ +//! │ (captures data) │ │ (Vec) │ │ │ +//! └─────────────────┘ └────────┬────────┘ └─────────────────┘ +//! │ ▲ +//! ┌────────────┼────────────┐ │ +//! │ │ │ │ +//! ┌─────▼────┐ ┌────▼─────┐ │ │ +//! │ COMMIT │ │ ROLLBACK │ │ │ +//! └─────┬────┘ └────┬─────┘ │ │ +//! │ │ │ │ +//! ▼ ▼ │ │ +//! on_commit() on_rollback() │ │ +//! │ │ │ │ +//! │ buffer.clear() │ │ +//! │ (discard) │ │ +//! │ │ │ +//! └─────────────────────────┴──────────┘ +//! change_tx.send() +//! (publish) +//! ``` +//! +//! Changes captured by the preupdate hook are buffered until the transaction +//! (explicit or implicit) completes. On commit, buffered changes are published +//! to subscribers. On rollback, they are discarded without notification. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::Instant; + +use parking_lot::{Mutex, RwLock}; +use tokio::sync::broadcast; +use tracing::{debug, error, trace}; + +use crate::change::{ChangeOperation, ColumnValue, TableChange, TableInfo}; +use crate::hooks::{PreUpdateEvent, SqliteValue}; + +/// Transaction-aware observation broker. +/// +/// Buffers preupdate events during transactions and publishes them to +/// subscribers only after successful commit. Rolled-back transactions +/// have their buffered changes discarded. +pub struct ObservationBroker { + buffer: Mutex>, + change_tx: broadcast::Sender, + observed_tables: RwLock>, + table_info: RwLock>, + capture_values: bool, +} + +impl ObservationBroker { + /// Creates a new broker with the specified broadcast channel capacity. + pub fn new(channel_capacity: usize, capture_values: bool) -> Arc { + let (change_tx, _) = broadcast::channel(channel_capacity); + Arc::new(Self { + buffer: Mutex::new(Vec::new()), + change_tx, + observed_tables: RwLock::new(HashSet::new()), + table_info: RwLock::new(HashMap::new()), + capture_values, + }) + } + + /// Checks if a table is being observed. + pub fn is_table_observed(&self, table: &str) -> bool { + self.observed_tables.read().contains(table) + } + + /// Registers a table for observation with its schema information. + /// + /// Only changes to observed tables will be buffered and published. + /// The `TableInfo` is required to correctly extract primary key values + /// and determine whether the rowid is meaningful for the table. + pub fn observe_table(&self, table: &str, info: TableInfo) { + trace!( + table = %table, + pk_columns = ?info.pk_columns, + without_rowid = info.without_rowid, + "Observing table with schema info" + ); + self.observed_tables.write().insert(table.to_string()); + self.table_info.write().insert(table.to_string(), info); + } + + /// Gets the schema information for an observed table. + pub fn get_table_info(&self, table: &str) -> Option { + self.table_info.read().get(table).cloned() + } + + /// Returns a list of all observed tables. + pub fn get_observed_tables(&self) -> Vec { + self.observed_tables.read().iter().cloned().collect() + } + + /// Called by preupdate_hook - buffers the event for later processing. + /// + /// Events are held in the buffer until either `on_commit()` (publish) + /// or `on_rollback()` (discard) is called. + pub fn on_preupdate(&self, event: PreUpdateEvent) { + trace!( + table = %event.table, + operation = ?event.operation, + "Buffering preupdate event" + ); + self.buffer.lock().push(event); + } + + /// Called by commit_hook - flushes buffered events to subscribers. + /// + /// Converts all buffered `PreUpdateEvent`s to `TableChange`s and sends + /// them through the broadcast channel. The buffer is cleared afterward. + pub fn on_commit(&self) { + let events: Vec = { + let mut buffer = self.buffer.lock(); + std::mem::take(&mut *buffer) + }; + + if events.is_empty() { + return; + } + + debug!(count = events.len(), "Flushing buffered changes on commit"); + + for event in events { + match self.event_to_change(event) { + Ok(table_change) => { + let _ = self.change_tx.send(table_change); + } + Err(e) => { + error!(error = %e, "Failed to convert event to change"); + } + } + } + } + + /// Called by rollback_hook - discards all buffered events. + /// + /// Clears the buffer without publishing any changes to subscribers. + pub fn on_rollback(&self) { + let count = { + let mut buffer = self.buffer.lock(); + let count = buffer.len(); + buffer.clear(); + count + }; + + if count > 0 { + debug!(count, "Discarding buffered changes on rollback"); + } + } + + /// Subscribes to change notifications. + /// + /// Returns a broadcast receiver that will receive `TableChange` events + /// after transactions commit. + pub fn subscribe(&self) -> broadcast::Receiver { + self.change_tx.subscribe() + } + + /// Converts a PreUpdateEvent to a TableChange for broadcast. + fn event_to_change(&self, event: PreUpdateEvent) -> crate::Result { + let table_info = self.table_info.read().get(&event.table).cloned(); + + // For WITHOUT ROWID tables, the rowid from preupdate hook is not meaningful + let rowid = match &table_info { + Some(info) if info.without_rowid => None, + _ => match event.operation { + ChangeOperation::Insert => Some(event.new_rowid), + ChangeOperation::Delete => Some(event.old_rowid), + ChangeOperation::Update => Some(event.new_rowid), + }, + }; + + // Extract primary key values from the appropriate column values + let primary_key = self.extract_primary_key(&event, table_info.as_ref())?; + + let (old_values, new_values) = if self.capture_values { + ( + event.old_values.map(Self::values_to_vec), + event.new_values.map(Self::values_to_vec), + ) + } else { + (None, None) + }; + + Ok(TableChange { + table: event.table, + operation: Some(event.operation), + rowid, + primary_key, + old_values, + new_values, + timestamp: Instant::now(), + }) + } + + /// Extracts primary key values from the event based on table schema. + /// + /// Returns an error if the schema has drifted (e.g., table was altered) + /// and PK column indices are out of bounds. + fn extract_primary_key( + &self, + event: &PreUpdateEvent, + table_info: Option<&TableInfo>, + ) -> crate::Result> { + let Some(info) = table_info else { + return Ok(Vec::new()); + }; + + if info.pk_columns.is_empty() { + return Ok(Vec::new()); + } + + // For DELETE, use old values; for INSERT/UPDATE, use new values + let values = match event.operation { + ChangeOperation::Delete => event.old_values.as_ref(), + ChangeOperation::Insert | ChangeOperation::Update => event.new_values.as_ref(), + }; + + let Some(values) = values else { + return Ok(Vec::new()); + }; + + // Extract values at the PK column indices, erroring if any index is out of bounds + let mut pk_values = Vec::with_capacity(info.pk_columns.len()); + for &idx in &info.pk_columns { + match values.get(idx) { + Some(v) => pk_values.push(v.clone().into()), + None => { + return Err(crate::Error::SchemaMismatch { + table: event.table.clone(), + expected: info.pk_columns.len(), + actual: values.len(), + }); + } + } + } + Ok(pk_values) + } + + /// Converts SqliteValue vec to ColumnValue vec for TableChange. + fn values_to_vec(values: Vec) -> Vec { + values.into_iter().map(|v| v.into()).collect() + } +} + +impl std::fmt::Debug for ObservationBroker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ObservationBroker") + .field("buffer_len", &self.buffer.lock().len()) + .field("observed_tables", &self.observed_tables.read().len()) + .finish() + } +} diff --git a/crates/sqlx-sqlite-observer/src/change.rs b/crates/sqlx-sqlite-observer/src/change.rs new file mode 100644 index 0000000..3887c6b --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/change.rs @@ -0,0 +1,127 @@ +use std::time::Instant; + +use crate::hooks::SqliteValue; + +/// Schema information for an observed table. +/// +/// Used to extract primary key values and determine if rowid is meaningful. +#[derive(Debug, Clone, Default)] +pub struct TableInfo { + /// Column indices that form the primary key, in declaration order. + /// For `INTEGER PRIMARY KEY` tables, this contains the single PK column index. + /// For composite PKs, indices are ordered as declared in the schema. + pub pk_columns: Vec, + /// True if the table was created with `WITHOUT ROWID`. + /// For such tables, the preupdate hook's rowid parameter contains the first column + /// of the PRIMARY KEY (coerced to i64), which may not be meaningful/correct for + /// non-integer or composite primary keys. + pub without_rowid: bool, +} + +impl TableInfo { + /// Creates a new TableInfo with the given PK column indices. + pub fn new(pk_columns: Vec, without_rowid: bool) -> Self { + Self { + pk_columns, + without_rowid, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ChangeOperation { + Insert, + Update, + Delete, +} + +/// Typed column value from SQLite. +/// +/// Represents a single column's value with its native SQLite type. +/// This replaces the previous JSON string representation for better +/// type safety and performance. +#[derive(Debug, Clone, PartialEq)] +pub enum ColumnValue { + Null, + Integer(i64), + Real(f64), + Text(String), + Blob(Vec), +} + +impl From for ColumnValue { + fn from(value: SqliteValue) -> Self { + match value { + SqliteValue::Null => ColumnValue::Null, + SqliteValue::Integer(i) => ColumnValue::Integer(i), + SqliteValue::Real(r) => ColumnValue::Real(r), + SqliteValue::Text(s) => ColumnValue::Text(s), + SqliteValue::Blob(b) => ColumnValue::Blob(b), + } + } +} + +impl ColumnValue { + /// Returns true if this value is null. + pub fn is_null(&self) -> bool { + matches!(self, ColumnValue::Null) + } + + /// Attempts to get this value as an integer. + pub fn as_integer(&self) -> Option { + match self { + ColumnValue::Integer(i) => Some(*i), + _ => None, + } + } + + /// Attempts to get this value as a float. + pub fn as_real(&self) -> Option { + match self { + ColumnValue::Real(r) => Some(*r), + _ => None, + } + } + + /// Attempts to get this value as a string reference. + pub fn as_text(&self) -> Option<&str> { + match self { + ColumnValue::Text(s) => Some(s), + _ => None, + } + } + + /// Attempts to get this value as a blob reference. + pub fn as_blob(&self) -> Option<&[u8]> { + match self { + ColumnValue::Blob(b) => Some(b), + _ => None, + } + } +} + +/// Notification of a change to a database table. +/// +/// Contains the table name, operation type, affected rowid, and the +/// old/new column values (when available). Changes are only sent after +/// the transaction commits successfully. +#[derive(Debug, Clone)] +pub struct TableChange { + pub table: String, + pub operation: Option, + /// The SQLite internal rowid. This is `None` for WITHOUT ROWID tables + /// since the preupdate hook's rowid parameter is not meaningful for them. + pub rowid: Option, + /// The primary key value(s) for the affected row. + /// For composite primary keys, values are ordered by their declaration order. + /// For DELETE operations, this contains the old PK values. + /// For INSERT/UPDATE operations, this contains the new PK values. + pub primary_key: Vec, + /// Column values before the change (for UPDATE and DELETE). + /// Values are ordered by column index as defined in the table schema. + pub old_values: Option>, + /// Column values after the change (for INSERT and UPDATE). + /// Values are ordered by column index as defined in the table schema. + pub new_values: Option>, + pub timestamp: Instant, +} diff --git a/crates/sqlx-sqlite-observer/src/error.rs b/crates/sqlx-sqlite-observer/src/error.rs new file mode 100644 index 0000000..06ad3c8 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/error.rs @@ -0,0 +1,23 @@ +//! Error types for the sqlx-sqlite-observer crate. + +/// Errors that can occur during observation operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to register SQLite hooks. + #[error("Hook registration failed: {0}")] + HookRegistration(String), + + /// SQLx database error. + #[error("Database error: {0}")] + Sqlx(#[from] sqlx::Error), + + /// Schema mismatch - table schema changed while observing. + #[error( + "Schema mismatch for table '{table}': expected {expected} PK columns, but only {actual} values available" + )] + SchemaMismatch { + table: String, + expected: usize, + actual: usize, + }, +} diff --git a/crates/sqlx-sqlite-observer/src/hooks.rs b/crates/sqlx-sqlite-observer/src/hooks.rs new file mode 100644 index 0000000..58f1cca --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/hooks.rs @@ -0,0 +1,375 @@ +//! SQLite native hook registration for support observing changes to the database. +//! +//! This module provides low-level bindings to SQLite's preupdate_hook, commit_hook, +//! and rollback_hook APIs for transaction-aware change tracking. +//! +//! # SQLite Requirements +//! +//! The preupdate hook requires SQLite compiled with `SQLITE_ENABLE_PREUPDATE_HOOK`. +//! Use [`is_preupdate_hook_enabled()`] to check at runtime whether the linked +//! SQLite library supports this feature. + +use std::ffi::{CStr, CString, c_int, c_void}; +use std::panic::catch_unwind; +use std::ptr; +use std::sync::Arc; + +use libsqlite3_sys::{ + SQLITE_BLOB, SQLITE_DELETE, SQLITE_FLOAT, SQLITE_INSERT, SQLITE_INTEGER, SQLITE_NULL, + SQLITE_TEXT, SQLITE_UPDATE, sqlite3, sqlite3_commit_hook, sqlite3_compileoption_used, + sqlite3_preupdate_count, sqlite3_preupdate_hook, sqlite3_preupdate_new, sqlite3_preupdate_old, + sqlite3_rollback_hook, sqlite3_value, sqlite3_value_blob, sqlite3_value_bytes, + sqlite3_value_double, sqlite3_value_int64, sqlite3_value_text, sqlite3_value_type, +}; +use tracing::{debug, error, trace}; + +use crate::broker::ObservationBroker; +use crate::change::ChangeOperation; + +/// A SQLite value extracted from preupdate hooks. +/// +/// Represents the typed value of a column before or after a change operation. +#[derive(Debug, Clone, PartialEq)] +pub enum SqliteValue { + Null, + Integer(i64), + Real(f64), + Text(String), + Blob(Vec), +} + +impl SqliteValue { + /// Extracts a value from a raw sqlite3_value pointer. + /// + /// # Safety + /// + /// The pointer must be valid and point to a properly initialized sqlite3_value. + unsafe fn from_raw(value: *mut sqlite3_value) -> Self { + if value.is_null() { + return SqliteValue::Null; + } + + // SAFETY: value is non-null and valid for the duration of the preupdate hook callback. + // SQLite guarantees the sqlite3_value pointer is valid until the callback returns. + match unsafe { sqlite3_value_type(value) } { + SQLITE_NULL => SqliteValue::Null, + SQLITE_INTEGER => SqliteValue::Integer(unsafe { sqlite3_value_int64(value) }), + SQLITE_FLOAT => SqliteValue::Real(unsafe { sqlite3_value_double(value) }), + SQLITE_TEXT => { + let text_ptr = unsafe { sqlite3_value_text(value) }; + if text_ptr.is_null() { + SqliteValue::Null + } else { + // SAFETY: SQLite guarantees text is valid UTF-8 with a null terminator + let cstr = unsafe { CStr::from_ptr(text_ptr as *const i8) }; + SqliteValue::Text(cstr.to_string_lossy().into_owned()) + } + } + SQLITE_BLOB => { + let blob_ptr = unsafe { sqlite3_value_blob(value) }; + let len = unsafe { sqlite3_value_bytes(value) } as usize; + if blob_ptr.is_null() || len == 0 { + SqliteValue::Blob(Vec::new()) + } else { + // SAFETY: blob_ptr is non-null and len bytes are valid for the callback duration + let slice = unsafe { std::slice::from_raw_parts(blob_ptr as *const u8, len) }; + SqliteValue::Blob(slice.to_vec()) + } + } + _ => SqliteValue::Null, + } + } +} + +/// Raw change event captured by the preupdate hook before commit decision. +#[derive(Debug, Clone)] +pub struct PreUpdateEvent { + pub table: String, + pub operation: ChangeOperation, + pub old_rowid: i64, + pub new_rowid: i64, + pub old_values: Option>, + pub new_values: Option>, +} + +/// Context data passed to SQLite hook callbacks. +/// +/// Stored as user_data pointer in SQLite hooks. The Arc ensures the broker +/// stays alive as long as hooks are registered. +struct HookContext { + broker: Arc, +} + +/// Checks if the linked SQLite library was compiled with `SQLITE_ENABLE_PREUPDATE_HOOK`. +/// +/// Returns `true` if preupdate hooks are supported, `false` otherwise. +/// This should be checked before attempting to use observation features. +/// +/// # Example +/// +/// ```no_run +/// use sqlx_sqlite_observer::is_preupdate_hook_enabled; +/// +/// if !is_preupdate_hook_enabled() { +/// panic!("SQLite was not compiled with SQLITE_ENABLE_PREUPDATE_HOOK"); +/// } +/// ``` +pub fn is_preupdate_hook_enabled() -> bool { + let opt_name = CString::new("ENABLE_PREUPDATE_HOOK").expect("CString::new failed"); + unsafe { sqlite3_compileoption_used(opt_name.as_ptr()) == 1 } +} + +/// Registers all observation hooks on a raw SQLite connection. +/// +/// Hooks are automatically cleaned up by SQLite when the connection is closed, +/// either explicitly or when the connection exceeds the sqlx pool's `idle_timeout`. +/// +/// # Safety +/// +/// - `db` must be a valid pointer to an open sqlite3 connection +/// - The broker must outlive the connection (ensured by Arc) +/// - Must be called from the same thread that owns the connection, or +/// the connection must be in serialized threading mode +/// +/// # Errors +/// +/// Returns an error if preupdate hooks are not supported by the linked SQLite +/// library, or if the hooks cannot be registered. +pub unsafe fn register_hooks( + db: *mut sqlite3, + broker: Arc, +) -> crate::Result<()> { + // Check at runtime if preupdate hook is supported + if !is_preupdate_hook_enabled() { + return Err(crate::Error::HookRegistration( + "SQLite was not compiled with SQLITE_ENABLE_PREUPDATE_HOOK. \ + Ensure you're using a SQLite build with preupdate hook support, \ + or enable the 'bundled' feature on libsqlite3-sys." + .to_string(), + )); + } + + debug!("Registering SQLite observation hooks"); + + // Heap-allocate the context so it outlives this function. SQLite's C API + // requires a raw pointer to pass user data to callbacks. + let context = Box::new(HookContext { broker }); + // Transfer ownership out of Rust's memory management. + // + // NOTE: This pointer is shared across all three hooks and is intentionally + // leaked. SQLite does NOT free user_data - it simply passes the pointer back + // to callbacks. The memory is reclaimed when hooks are replaced via + // `unregister_hooks`, which reconstructs the Box from the raw pointer returned + // by `sqlite3_preupdate_hook`. If hooks are never explicitly unregistered, + // the memory lives until the process exits (acceptable for long-lived + // connections where the count is bounded). + let context_ptr = Box::into_raw(context) as *mut c_void; + + // SAFETY: db is a valid sqlite3 pointer (guaranteed by caller). + // Each hook receives the same context_ptr, which remains valid until + // unregister_hooks is called or the process exits. + unsafe { + sqlite3_preupdate_hook(db, Some(preupdate_callback), context_ptr); + sqlite3_commit_hook(db, Some(commit_callback), context_ptr); + sqlite3_rollback_hook(db, Some(rollback_callback), context_ptr); + } + + trace!("SQLite hooks registered successfully"); + Ok(()) +} + +/// Unregisters all observation hooks and reclaims the context memory. +/// +/// # Safety +/// +/// - `db` must be the same valid sqlite3 pointer passed to `register_hooks` +/// - Must only be called once per `register_hooks` call +/// - Must not be called concurrently with hook callbacks +pub unsafe fn unregister_hooks(db: *mut sqlite3) { + // SAFETY: Passing null callback and null user_data removes the hook. + // sqlite3_preupdate_hook returns the previous user_data pointer, which + // we use to reclaim the Box we leaked in register_hooks. + let prev_user_data = unsafe { sqlite3_preupdate_hook(db, None, ptr::null_mut()) }; + unsafe { + sqlite3_commit_hook(db, None, ptr::null_mut()); + sqlite3_rollback_hook(db, None, ptr::null_mut()); + } + + // Reclaim the HookContext we leaked in register_hooks + if !prev_user_data.is_null() { + // SAFETY: prev_user_data was created by Box::into_raw in register_hooks + let _ = unsafe { Box::from_raw(prev_user_data as *mut HookContext) }; + trace!("SQLite hooks unregistered and context freed"); + } +} + +/// Preupdate hook callback - captures changes before they're committed. +/// +/// Called by SQLite for INSERT, UPDATE, and DELETE operations. Captures old/new +/// row values and buffers them in the broker until commit or rollback. +/// +/// Note: `user_data` is SQLite's C API term for callback context (our HookContext), +/// unrelated to our app's user data. +unsafe extern "C" fn preupdate_callback( + user_data: *mut c_void, + db: *mut sqlite3, + op: c_int, + _database: *const i8, + table: *const i8, + old_rowid: i64, + new_rowid: i64, +) { + if user_data.is_null() || table.is_null() { + return; + } + + // Catch any panics to prevent unwinding across the FFI boundary (which is UB). + let result = catch_unwind(|| { + // SAFETY: user_data is a valid HookContext pointer created in register_hooks + // and remains valid until unregister_hooks is called. + let context = unsafe { &*(user_data as *const HookContext) }; + + // SAFETY: table is a non-null C string provided by SQLite, valid for this callback. + let table_name = match unsafe { CStr::from_ptr(table) }.to_str() { + Ok(s) => s.to_string(), + Err(_) => return, + }; + + // Check if this table is being observed + if !context.broker.is_table_observed(&table_name) { + return; + } + + let operation = match op { + SQLITE_INSERT => ChangeOperation::Insert, + SQLITE_UPDATE => ChangeOperation::Update, + SQLITE_DELETE => ChangeOperation::Delete, + _ => return, + }; + + trace!(table = %table_name, ?operation, old_rowid, new_rowid, "Preupdate hook fired"); + + // SAFETY: db is a valid sqlite3 pointer provided by SQLite for this callback. + let column_count = unsafe { sqlite3_preupdate_count(db) }; + if column_count < 0 { + error!("Failed to get column count in preupdate hook"); + return; + } + let column_count = column_count as usize; + + // Capture old values (for UPDATE and DELETE) + let old_values = if matches!(operation, ChangeOperation::Update | ChangeOperation::Delete) { + let mut values = Vec::with_capacity(column_count); + for i in 0..column_count { + let mut value: *mut sqlite3_value = ptr::null_mut(); + // SAFETY: db is valid, i is in range [0, column_count) + if unsafe { sqlite3_preupdate_old(db, i as c_int, &mut value) } == 0 { + // SAFETY: value was populated by sqlite3_preupdate_old + values.push(unsafe { SqliteValue::from_raw(value) }); + } else { + values.push(SqliteValue::Null); + } + } + Some(values) + } else { + None + }; + + // Capture new values (for INSERT and UPDATE) + let new_values = if matches!(operation, ChangeOperation::Insert | ChangeOperation::Update) { + let mut values = Vec::with_capacity(column_count); + for i in 0..column_count { + let mut value: *mut sqlite3_value = ptr::null_mut(); + // SAFETY: db is valid, i is in range [0, column_count) + if unsafe { sqlite3_preupdate_new(db, i as c_int, &mut value) } == 0 { + // SAFETY: value was populated by sqlite3_preupdate_new + values.push(unsafe { SqliteValue::from_raw(value) }); + } else { + values.push(SqliteValue::Null); + } + } + Some(values) + } else { + None + }; + + let event = PreUpdateEvent { + table: table_name, + operation, + old_rowid, + new_rowid, + old_values, + new_values, + }; + + context.broker.on_preupdate(event); + }); + + if result.is_err() { + // Cannot use tracing here since it may have been the source of the panic. + // The best we can do is silently absorb it to prevent UB. + eprintln!("sqlx-sqlite-observer: panic in preupdate_callback (absorbed to prevent UB)"); + } +} + +/// Commit hook callback - flushes buffered changes to subscribers. +/// +/// Called by SQLite when a transaction is about to commit. Returning 0 allows +/// the commit to proceed; returning non-zero would cause a rollback. +/// +/// Note: `user_data` is SQLite's C API term for callback context (our HookContext), +/// unrelated to application-level user data. +unsafe extern "C" fn commit_callback(user_data: *mut c_void) -> c_int { + if user_data.is_null() { + return 0; + } + + // Catch any panics to prevent unwinding across the FFI boundary (which is UB). + let result = catch_unwind(|| { + // SAFETY: user_data is a valid HookContext pointer created in register_hooks. + let context = unsafe { &*(user_data as *const HookContext) }; + trace!("Commit hook fired - flushing changes"); + context.broker.on_commit(); + }); + + if result.is_err() { + eprintln!("sqlx-sqlite-observer: panic in commit_callback (absorbed to prevent UB)"); + } + + 0 // Allow commit to proceed +} + +/// Rollback hook callback - discards buffered changes. +/// +/// Called by SQLite when a transaction is rolled back. +/// +/// Note: `user_data` is SQLite's C API term for callback context (our HookContext), +/// unrelated to application-level user data. +unsafe extern "C" fn rollback_callback(user_data: *mut c_void) { + if user_data.is_null() { + return; + } + + // Catch any panics to prevent unwinding across the FFI boundary (which is UB). + let result = catch_unwind(|| { + // SAFETY: user_data is a valid HookContext pointer created in register_hooks. + let context = unsafe { &*(user_data as *const HookContext) }; + trace!("Rollback hook fired - discarding changes"); + context.broker.on_rollback(); + }); + + if result.is_err() { + eprintln!("sqlx-sqlite-observer: panic in rollback_callback (absorbed to prevent UB)"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sqlite_value_from_null() { + let value = unsafe { SqliteValue::from_raw(ptr::null_mut()) }; + assert_eq!(value, SqliteValue::Null); + } +} diff --git a/crates/sqlx-sqlite-observer/src/lib.rs b/crates/sqlx-sqlite-observer/src/lib.rs new file mode 100644 index 0000000..5320a63 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/lib.rs @@ -0,0 +1,37 @@ +//! Reactive change notifications for SQLite databases using sqlx. +//! +//! This crate provides **transaction-safe** change notifications for SQLite databases +//! using SQLite's native hooks (`preupdate_hook`, `commit_hook`, `rollback_hook`). +//! +//! # SQLite Requirements +//! +//! Requires SQLite compiled with `SQLITE_ENABLE_PREUPDATE_HOOK`. +//! +//! **Important:** Most system SQLite libraries do NOT have this option enabled by default. +//! You have two options: +//! +//! 1. **Use the `bundled` feature** (recommended for most users): +//! ```toml +//! sqlx-sqlite-observer = { version = "0.8", features = ["bundled"] } +//! ``` +//! This compiles SQLite from source with preupdate hook support (~1MB binary size increase). +//! +//! 2. **Provide your own SQLite** with `SQLITE_ENABLE_PREUPDATE_HOOK` compiled in. +//! Use [`is_preupdate_hook_enabled()`] to verify at runtime. +//! +//! # Status +//! +//! This crate is in early development. See the README for the planned API. + +pub mod broker; +pub mod change; +pub mod error; +pub mod hooks; +pub mod schema; + +pub use broker::ObservationBroker; +pub use change::{ChangeOperation, ColumnValue, TableChange, TableInfo}; +pub use error::Error; +pub use hooks::{SqliteValue, is_preupdate_hook_enabled, unregister_hooks}; + +pub type Result = std::result::Result; diff --git a/crates/sqlx-sqlite-observer/src/schema.rs b/crates/sqlx-sqlite-observer/src/schema.rs new file mode 100644 index 0000000..8615514 --- /dev/null +++ b/crates/sqlx-sqlite-observer/src/schema.rs @@ -0,0 +1,168 @@ +//! Schema introspection utilities for SQLite tables. +//! +//! Provides functions to query table schema information needed for +//! primary key extraction and WITHOUT ROWID detection. + +use regex::Regex; +use sqlx::{Row, SqliteConnection}; +use std::sync::OnceLock; + +use crate::change::TableInfo; + +/// Queries the schema information for a table. +/// +/// Returns `TableInfo` containing primary key column indices and WITHOUT ROWID status. +/// Returns `None` if the table doesn't exist. +pub async fn query_table_info( + conn: &mut SqliteConnection, + table_name: &str, +) -> crate::Result> { + // Check if table exists and get WITHOUT ROWID status + let without_rowid = is_without_rowid(conn, table_name).await?; + + // Get primary key columns using PRAGMA table_info + let pk_columns = query_pk_columns(conn, table_name).await?; + + // Determine if table exists: + // - If pk_columns is None, PRAGMA table_info returned no rows (table doesn't exist) + // - If without_rowid is true, the table must exist (we found it in sqlite_master) + // - A table with no explicit PK returns Some([]), not None + if pk_columns.is_none() && !without_rowid { + return Ok(None); + } + + Ok(Some(TableInfo::new( + pk_columns.unwrap_or_default(), + without_rowid, + ))) +} + +/// Checks if a table was created with WITHOUT ROWID. +/// +/// Uses a regex anchored to the end of the CREATE TABLE statement to avoid +/// false positives from string literals or comments containing "WITHOUT ROWID". +async fn is_without_rowid(conn: &mut SqliteConnection, table_name: &str) -> crate::Result { + let sql = r#" + SELECT sql FROM sqlite_master + WHERE type = 'table' AND name = ?1 + "#; + + let row: Option<(Option,)> = sqlx::query_as(sql) + .bind(table_name) + .fetch_optional(&mut *conn) + .await + .map_err(crate::Error::Sqlx)?; + + match row { + Some((Some(create_sql),)) => Ok(has_without_rowid_clause(&create_sql)), + _ => Ok(false), + } +} + +/// Checks if a CREATE TABLE statement ends with WITHOUT ROWID. +/// +/// The regex matches "WITHOUT ROWID" only when it appears at the end of the +/// statement (after the closing parenthesis), avoiding false matches in +/// string literals or comments. +fn has_without_rowid_clause(create_sql: &str) -> bool { + static RE: OnceLock = OnceLock::new(); + let re = RE.get_or_init(|| { + // Match WITHOUT ROWID after ) with optional whitespace, case-insensitive + Regex::new(r"(?i)\)\s*WITHOUT\s+ROWID\s*$").expect("invalid regex") + }); + re.is_match(create_sql) +} + +/// Queries the primary key column indices for a table. +/// +/// Returns column indices in the order they appear in the PRIMARY KEY definition. +/// For composite primary keys, the `pk` column in PRAGMA table_info indicates +/// the position (1-indexed) within the PK. +async fn query_pk_columns( + conn: &mut SqliteConnection, + table_name: &str, +) -> crate::Result>> { + // PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk + // pk is 0 for non-PK columns, or 1-indexed position for PK columns + let pragma = format!("PRAGMA table_info({})", quote_identifier(table_name)); + + let rows = sqlx::query(&pragma) + .fetch_all(&mut *conn) + .await + .map_err(crate::Error::Sqlx)?; + + if rows.is_empty() { + return Ok(None); // Table doesn't exist + } + + // Collect (cid, pk_position) for columns that are part of the PK + let mut pk_columns: Vec<(usize, i32)> = rows + .iter() + .filter_map(|row| { + let cid: i32 = row.get("cid"); + let pk: i32 = row.get("pk"); + if pk > 0 { + Some((cid as usize, pk)) + } else { + None + } + }) + .collect(); + + // Sort by pk position to get correct order for composite PKs + pk_columns.sort_by_key(|(_, pk_pos)| *pk_pos); + + // Return just the column indices + Ok(Some(pk_columns.into_iter().map(|(cid, _)| cid).collect())) +} + +/// Quotes a SQLite identifier to prevent SQL injection. +fn quote_identifier(name: &str) -> String { + // Double any existing double quotes and wrap in double quotes + format!("\"{}\"", name.replace('"', "\"\"")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_quote_identifier() { + assert_eq!(quote_identifier("users"), "\"users\""); + assert_eq!(quote_identifier("my table"), "\"my table\""); + assert_eq!(quote_identifier("foo\"bar"), "\"foo\"\"bar\""); + } + + #[test] + fn test_has_without_rowid_clause() { + // Positive cases + assert!(has_without_rowid_clause( + "CREATE TABLE t (id TEXT PRIMARY KEY) WITHOUT ROWID" + )); + assert!(has_without_rowid_clause( + "CREATE TABLE t (id TEXT PRIMARY KEY) WITHOUT ROWID " + )); + assert!(has_without_rowid_clause( + "CREATE TABLE t (id TEXT PRIMARY KEY) WITHOUT ROWID" + )); + assert!(has_without_rowid_clause( + "CREATE TABLE t (id TEXT PRIMARY KEY) without rowid" + )); + assert!(has_without_rowid_clause( + "CREATE TABLE t (id TEXT PRIMARY KEY)\nWITHOUT ROWID" + )); + + // Negative cases - normal tables + assert!(!has_without_rowid_clause( + "CREATE TABLE t (id INTEGER PRIMARY KEY)" + )); + + // Negative cases - false positive prevention + assert!(!has_without_rowid_clause( + "CREATE TABLE t (note TEXT DEFAULT 'see WITHOUT ROWID docs')" + )); + assert!(!has_without_rowid_clause( + "CREATE TABLE t (id INT, note TEXT) -- WITHOUT ROWID comment" + )); + } +}