Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sqlx-sqlite-observer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ keywords = ["sqlite", "sqlx", "reactive", "observer", "database"]
categories = ["database", "asynchronous"]

[features]
conn-mgr = ["dep:sqlx-sqlite-conn-mgr"]
# Bundle SQLite by default - preupdate hooks require SQLITE_ENABLE_PREUPDATE_HOOK
# which most system SQLite libraries don't have enabled.
default = ["bundled"]
Expand All @@ -28,6 +29,7 @@ regex = "1.12.3"
sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio"], default-features = false }
# Required for preupdate_hook - SQLite must be compiled with SQLITE_ENABLE_PREUPDATE_HOOK
libsqlite3-sys = { version = "0.30.1", features = ["preupdate_hook"] }
sqlx-sqlite-conn-mgr = { version = "0.8.6", optional = true }

[dev-dependencies]
tokio = { version = "1.49.0", features = ["full", "macros"] }
Expand Down
119 changes: 106 additions & 13 deletions crates/sqlx-sqlite-observer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ This ensures subscribers **only receive notifications for committed changes**.
### Core Types

* **`TableChange`**: Notification of a change to a database table
* **`TableChangeEvent`**: Event yielded by `TableChangeStream` —
either `Change(TableChange)` or `Lagged(u64)`
* **`ChangeOperation`**: Insert, Update, or Delete
* **`ColumnValue`**: Typed column value (Null, Integer, Real, Text, Blob)
* **`ObserverConfig`**: Configuration for table filtering and channel
Expand All @@ -111,7 +113,7 @@ This ensures subscribers **only receive notifications for committed changes**.
* **`TableChangeStreamExt`**: Extension trait for converting receivers to
streams

### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`) <!-- COMING SOON -->
### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`)

* **`ObservableSqliteDatabase`**: Wrapper for `SqliteDatabase` with observation
* **`ObservableWriteGuard`**: Write guard with hooks registered
Expand Down Expand Up @@ -179,7 +181,7 @@ meaningful/correct for non-integer or composite primary keys.

### Basic Usage

```rust,no_run
```rust
use sqlx::SqlitePool;
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};

Expand Down Expand Up @@ -219,10 +221,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

### Stream API

```rust,no_run
```rust
use futures::StreamExt;
use sqlx::SqlitePool;
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig};
use sqlx_sqlite_observer::{
SqliteObserver, ObserverConfig, TableChangeEvent,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -232,13 +236,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut stream = observer.subscribe_stream(["users"]);

while let Some(change) = stream.next().await {
println!(
"Table {} row {} was {:?}",
change.table,
change.rowid.unwrap_or(-1),
change.operation
);
while let Some(event) = stream.next().await {
match event {
TableChangeEvent::Change(change) => {
println!(
"Table {} row {} was {:?}",
change.table,
change.rowid.unwrap_or(-1),
change.operation
);
}
TableChangeEvent::Lagged(n) => {
eprintln!("Missed {} notifications", n);
}
}
}

Ok(())
Expand All @@ -247,7 +258,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

### Value Capture

```rust,no_run
```rust
use sqlx::SqlitePool;
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};

Expand Down Expand Up @@ -284,7 +295,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

### SQLx SQLite Connection Manager Integration

<!-- TODO: Add example showing ObservableSqliteDatabase usage -->
```rust
use std::sync::Arc;
use sqlx_sqlite_conn_mgr::SqliteDatabase;
use sqlx_sqlite_observer::{
ObservableSqliteDatabase, ObserverConfig,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = SqliteDatabase::connect("mydb.db", None).await?;
let config = ObserverConfig::new().with_tables(["users"]);
let observable = ObservableSqliteDatabase::new(db, config);

let mut rx = observable.subscribe(["users"]);

// Write through the observable writer
let mut writer = observable.acquire_writer().await?;
sqlx::query("BEGIN").execute(&mut *writer).await?;
sqlx::query("INSERT INTO users (name) VALUES (?)")
.bind("Alice")
.execute(&mut *writer)
.await?;
sqlx::query("COMMIT").execute(&mut *writer).await?;

// Notification arrives after commit
let change = rx.recv().await?;
println!("Changed: {}", change.table);

Ok(())
}
```

## Usage Notes

Expand All @@ -301,6 +342,58 @@ let config = ObserverConfig::new()
.with_channel_capacity(1000); // Handle large transactions
```

### Handling Lag

When using the Stream API, the stream yields `TableChangeEvent` values.
Most events are `Change` variants, but if a consumer falls behind, the
stream yields a `Lagged(n)` event indicating how many notifications
were missed.

```rust
use futures::StreamExt;
use sqlx_sqlite_observer::TableChangeEvent;
# use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig};
# async fn example(observer: SqliteObserver) {

let mut stream = observer.subscribe_stream(["users"]);

while let Some(event) = stream.next().await {
match event {
TableChangeEvent::Change(change) => {
// Process the change normally
}
TableChangeEvent::Lagged(n) => {
// n notifications were missed — local state may be stale.
// Re-query the database for current state.
tracing::warn!("Missed {} change notifications", n);
}
}
}
# }
```

**When does lag happen?** The broadcast channel has a fixed capacity
(default 256). Lag occurs when the oldest unread messages are
overwritten. This can happen in two ways:

* A subscriber processes changes slower than they arrive
* A single transaction contains more mutating statements than the
channel capacity, causing messages to be overwritten before the
consumer reads them

This is rare under normal conditions but can occur during bulk
writes or large transactions.

**How to prevent it:**

* Increase `channel_capacity` via `ObserverConfig::with_channel_capacity`
* Process changes faster (avoid blocking in the stream consumer)
* Use a dedicated task for stream consumption

**Note:** The `broadcast::Receiver` API (from `subscribe()`) surfaces
lag as `RecvError::Lagged(n)` — the same information, just through
the raw tokio broadcast channel interface rather than the stream.

### Disabling Value Capture

By default, `TableChange` includes `old_values` and `new_values` with the actual
Expand Down
24 changes: 24 additions & 0 deletions crates/sqlx-sqlite-observer/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,30 @@ impl ColumnValue {
}
}

/// Event yielded by [`TableChangeStream`](crate::stream::TableChangeStream).
///
/// Most events are `Change` variants containing the actual table change data.
/// A `Lagged` event indicates the consumer fell behind and missed some
/// notifications — consider increasing
/// [`channel_capacity`](crate::config::ObserverConfig::channel_capacity).
#[derive(Debug, Clone)]
pub enum TableChangeEvent {
/// A table change notification.
Change(TableChange),
/// The stream fell behind and missed `n` change notifications.
///
/// This can happen when:
/// - The consumer is processing changes too slowly relative to the
/// rate of database writes.
/// - A single transaction contains more mutating statements than the
/// channel capacity, causing older messages to be overwritten before
/// the consumer reads them.
///
/// When this happens, the consumer should assume its local state may
/// be stale and re-query the database for the current state.
Lagged(u64),
}

/// Notification of a change to a database table.
///
/// Contains the table name, operation type, affected rowid, and the
Expand Down
Loading