diff --git a/Cargo.lock b/Cargo.lock index 4a42447a83f..f3a10542527 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8547,6 +8547,7 @@ dependencies = [ "serde_json", "spacetimedb-core", "spacetimedb-guard", + "spacetimedb-lib 2.1.0", "tempfile", "tokio", "tokio-postgres", diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 6b753a9c8fd..60a074fc249 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -103,6 +103,7 @@ fn map_reducer_error(e: ReducerCallError, reducer: &str) -> (StatusCode, String) log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {reducer}"); StatusCode::BAD_REQUEST } + ReducerCallError::OutOfOrderInboundMessage { .. } => StatusCode::BAD_REQUEST, }; log::debug!("Error while invoking reducer {e:#}"); @@ -216,6 +217,140 @@ pub async fn call( } } +/// Path parameters for the `call_from_database` route. +#[derive(Deserialize)] +pub struct CallFromDatabaseParams { + name_or_identity: NameOrIdentity, + reducer: String, +} + +/// Query parameters for the `call_from_database` route. +/// +/// Both fields are mandatory; a missing field results in a 400 Bad Request. +#[derive(Deserialize)] +pub struct CallFromDatabaseQuery { + /// [`Identity`] of the sending database, parsed from a hex query string. + sender_identity: Identity, + /// The inter-database message ID from the sender's st_outbound_msg. + /// Used for at-most-once delivery via `st_inbound_msg`. + msg_id: u64, +} + +/// Call a reducer on behalf of another database, with deduplication. +/// +/// Endpoint: `POST /database/:name_or_identity/call-from-database/:reducer` +/// +/// Required query params: +/// - `sender_identity` — hex-encoded identity of the sending database. +/// - `msg_id` — the inter-database message ID from the sender's st_outbound_msg. +/// +/// Semantics: +/// - The client **must send strictly increasing `msg_id` values per `sender_identity`.** +/// - If a `msg_id` is **less than the last seen msg_id** for that sender, the request +/// is treated as **invalid and rejected with a bad request error**. +/// - If the incoming `msg_id` is equal to the last delivered msg_id, the call is treated +/// as a duplicate and **200 OK is returned without invoking the reducer**. +pub async fn call_from_database( + State(worker_ctx): State, + Extension(auth): Extension, + Path(CallFromDatabaseParams { + name_or_identity, + reducer, + }): Path, + Query(CallFromDatabaseQuery { + sender_identity, + msg_id, + }): Query, + TypedHeader(content_type): TypedHeader, + body: axum::body::Bytes, +) -> axum::response::Result { + // IDC callers send BSATN (application/octet-stream). + if content_type != headers::ContentType::octet_stream() { + return Err((StatusCode::UNSUPPORTED_MEDIA_TYPE, "Expected application/octet-stream").into()); + } + + let caller_identity = auth.claims.identity; + + let args = FunctionArgs::Bsatn(body); + let connection_id = generate_random_connection_id(); + + let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + // Call client_connected, if defined. + module + .call_identity_connected(auth.into(), connection_id) + .await + .map_err(client_connected_error_to_response)?; + + let mut result = module + .call_reducer_from_database( + caller_identity, + Some(connection_id), + None, + None, + None, + &reducer, + args, + sender_identity, + msg_id, + ) + .await; + + //Wait for durability before sending response + if let Ok(rcr) = result.as_mut() + && let Some(tx_offset) = rcr.tx_offset.as_mut() + && let Some(mut durable_offset) = module.durable_tx_offset() + { + let tx_offset = tx_offset.await.map_err(|_| log_and_500("transaction aborted"))?; + durable_offset.wait_for(tx_offset).await.map_err(log_and_500)?; + } + + module + .call_identity_disconnected(caller_identity, connection_id) + .await + .map_err(client_disconnected_error_to_response)?; + + match result { + Ok(rcr) => { + let (status, body) = match rcr.outcome { + ReducerOutcome::Committed => ( + StatusCode::OK, + axum::body::Body::from(rcr.reducer_return_value.unwrap_or_default()), + ), + // 422 = reducer ran but returned Err; the IDC actor uses this to distinguish + // reducer failures from other errors (which it retries). + // This is inconsistent with `call` endpoint, which returns 523 status code if + // reducer fails + ReducerOutcome::Failed(errmsg) => ( + StatusCode::UNPROCESSABLE_ENTITY, + axum::body::Body::from(errmsg.to_string()), + ), + // This will be retried by IDC acttor + ReducerOutcome::BudgetExceeded => { + log::warn!( + "Node's energy budget exceeded for identity: {owner_identity} while executing {reducer}" + ); + ( + StatusCode::PAYMENT_REQUIRED, + axum::body::Body::from("Module energy budget exhausted."), + ) + } + }; + Ok(( + status, + TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)), + TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)), + body, + ) + .into_response()) + } + Err(e) => { + let (status, msg) = map_reducer_error(e, &reducer); + Err((status, msg).into()) + } + } +} + fn assert_content_type_json(content_type: headers::ContentType) -> axum::response::Result<()> { if content_type != headers::ContentType::json() { Err(axum::extract::rejection::MissingJsonContentType::default().into()) @@ -1189,6 +1324,8 @@ pub struct DatabaseRoutes { pub subscribe_get: MethodRouter, /// POST: /database/:name_or_identity/call/:reducer pub call_reducer_procedure_post: MethodRouter, + /// POST: /database/:name_or_identity/call-from-database/:reducer?sender_identity=&msg_id= + pub call_from_database_post: MethodRouter, /// GET: /database/:name_or_identity/schema pub schema_get: MethodRouter, /// GET: /database/:name_or_identity/logs @@ -1220,6 +1357,7 @@ where identity_get: get(get_identity::), subscribe_get: get(handle_websocket::), call_reducer_procedure_post: post(call::), + call_from_database_post: post(call_from_database::), schema_get: get(schema::), logs_get: get(logs::), sql_post: post(sql::), @@ -1245,6 +1383,7 @@ where .route("/identity", self.identity_get) .route("/subscribe", self.subscribe_get) .route("/call/:reducer", self.call_reducer_procedure_post) + .route("/call-from-database/:reducer", self.call_from_database_post) .route("/schema", self.schema_get) .route("/logs", self.logs_get) .route("/sql", self.sql_post) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 9a22c0e97d4..796ae55e9a8 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1,3 +1,4 @@ +use super::idc_actor::{IdcActor, IdcActorConfig, IdcActorSender, IdcActorStarter}; use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule}; use super::scheduler::SchedulerStarter; use super::wasmtime::WasmtimeRuntime; @@ -22,8 +23,10 @@ use crate::util::jobs::{AllocatedJobCore, JobCores}; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, bail, Context}; use async_trait::async_trait; +use bytes::Bytes; use durability::{Durability, EmptyHistory}; use log::{info, trace, warn}; +use once_cell::sync::OnceCell; use parking_lot::Mutex; use scopeguard::defer; use spacetimedb_commitlog::SizeOnDisk; @@ -117,6 +120,8 @@ pub struct HostController { db_cores: JobCores, /// The pool of buffers used to build `BsatnRowList`s in subscriptions. pub bsatn_rlb_pool: BsatnRowListBuilderPool, + /// Local port to be used by `IdcActor` to make remote reducer calls + idc_http_port: OnceCell, } pub(crate) struct HostRuntimes { @@ -132,11 +137,13 @@ impl HostRuntimes { } } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct ReducerCallResult { pub outcome: ReducerOutcome, + pub reducer_return_value: Option, pub energy_used: EnergyQuanta, pub execution_duration: Duration, + pub tx_offset: Option, } impl ReducerCallResult { @@ -228,6 +235,7 @@ impl HostController { page_pool: PagePool::new(default_config.page_pool_max_size), bsatn_rlb_pool: BsatnRowListBuilderPool::new(), db_cores, + idc_http_port: OnceCell::new(), } } @@ -236,6 +244,10 @@ impl HostController { self.program_storage = ps; } + pub fn set_idc_http_port(&self, port: u16) -> Result<(), u16> { + self.idc_http_port.set(port) + } + /// Get a [`ModuleHost`] managed by this controller, or launch it from /// persistent state. /// @@ -706,6 +718,7 @@ async fn make_module_host( runtimes: Arc, replica_ctx: Arc, scheduler: Scheduler, + idc_sender: IdcActorSender, program: Program, energy_monitor: Arc, unregister: impl Fn() + Send + Sync + 'static, @@ -721,6 +734,7 @@ async fn make_module_host( let mcc = ModuleCreationContext { replica_ctx, scheduler, + idc_sender, program_hash: program.hash, energy_monitor, }; @@ -758,6 +772,7 @@ struct LaunchedModule { module_host: ModuleHost, scheduler: Scheduler, scheduler_starter: SchedulerStarter, + idc_starter: IdcActorStarter, } struct ModuleLauncher { @@ -794,10 +809,12 @@ impl ModuleLauncher { .await .map(Arc::new)?; let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db().clone()); + let (idc_starter, idc_sender) = IdcActor::open(); let (program, module_host) = make_module_host( self.runtimes.clone(), replica_ctx.clone(), scheduler.clone(), + idc_sender.clone(), self.program, self.energy_monitor, self.on_panic, @@ -814,6 +831,7 @@ impl ModuleLauncher { module_host, scheduler, scheduler_starter, + idc_starter, }, )) } @@ -879,6 +897,9 @@ struct Host { /// Handle to the task responsible for cleaning up old views. /// The task is aborted when [`Host`] is dropped. view_cleanup_task: AbortHandle, + /// IDC actor: delivers outbound inter-database messages from `st_outbound_msg`. + /// Stopped when [`Host`] is dropped. + _idc_actor: IdcActor, } impl Host { @@ -1072,6 +1093,7 @@ impl Host { module_host, scheduler, scheduler_starter, + idc_starter, } = launched; // Disconnect dangling clients. @@ -1098,6 +1120,18 @@ impl Host { let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle(); let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone()); + let idc_actor = idc_starter.start( + replica_ctx.relational_db().clone(), + IdcActorConfig { + sender_identity: replica_ctx.database_identity, + http_port: *host_controller + .idc_http_port + .get() + .ok_or_else(|| anyhow!("Port for IDC actor is not initialized"))?, + }, + module_host.downgrade(), + ); + let module = watch::Sender::new(module_host); Ok(Host { @@ -1107,6 +1141,7 @@ impl Host { disk_metrics_recorder_task, tx_metrics_recorder_task, view_cleanup_task, + _idc_actor: idc_actor, }) } @@ -1179,11 +1214,13 @@ impl Host { ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db().clone()); + let (_idc_starter, idc_sender) = IdcActor::open(); let (program, module) = make_module_host( runtimes, replica_ctx.clone(), scheduler.clone(), + idc_sender, program, energy_monitor, on_panic, diff --git a/crates/core/src/host/idc_actor.rs b/crates/core/src/host/idc_actor.rs new file mode 100644 index 00000000000..4a6ed2fd886 --- /dev/null +++ b/crates/core/src/host/idc_actor.rs @@ -0,0 +1,676 @@ +/// Inter-Database Communication (IDC) Actor +/// +/// Background tokio task that: +/// 1. Loads undelivered entries from `st_outbound_msg` on startup, resolving delivery data from outbox tables. +/// 2. Accepts immediate notifications via an mpsc channel when new outbox rows are inserted. +/// 3. Delivers each message in msg_id order via HTTP POST to +/// `http://localhost:{http_port}/v1/database/{target_db}/call-from-database/{reducer}?sender_identity=&msg_id=` +/// 4. On transport errors (network, 5xx, 4xx except 422/402): retries infinitely with exponential +/// backoff, blocking only the affected target database (other targets continue unaffected). +/// 5. On reducer errors (HTTP 422) or budget exceeded (HTTP 402): calls the configured +/// `on_result_reducer` (read from the outbox table's schema) and deletes the st_outbound_msg row. +/// 6. Enforces sequential delivery per target database: msg N+1 is only delivered after N is done. +use crate::db::relational_db::RelationalDB; +use crate::energy::EnergyQuanta; +use crate::host::module_host::{CallReducerParams, ModuleInfo, WeakModuleHost}; +use crate::host::{FunctionArgs, ReducerCallError, ReducerCallResult, ReducerOutcome}; +use anyhow::anyhow; +use bytes::Bytes; +use spacetimedb_datastore::execution_context::{ReducerContext, Workload}; +use spacetimedb_datastore::locking_tx_datastore::MutTxId; +use spacetimedb_datastore::system_tables::{StInboundMsgResultStatus, StOutboundMsgRow, ST_OUTBOUND_MSG_ID}; +use spacetimedb_datastore::traits::IsolationLevel; +use spacetimedb_lib::{AlgebraicValue, Identity, ProductValue}; +use spacetimedb_primitives::{ColId, TableId}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; + +const INITIAL_BACKOFF: Duration = Duration::from_millis(100); +const MAX_BACKOFF: Duration = Duration::from_secs(30); +/// How long to wait before polling again when there is no work. +const POLL_INTERVAL: Duration = Duration::from_millis(500); + +/// A sender that notifies the IDC actor of a new outbox row. +/// +/// Sending `()` wakes the actor to deliver pending messages immediately +/// rather than waiting for the next poll cycle. +pub type IdcActorSender = mpsc::UnboundedSender<()>; + +/// The identity of this (sender) database, set when the IDC actor is started. +pub struct IdcActorConfig { + pub sender_identity: Identity, + pub http_port: u16, +} + +/// A handle that, when dropped, stops the IDC actor background task. +pub struct IdcActor { + _abort: tokio::task::AbortHandle, +} + +/// Holds the receiver side of the notification channel until the actor is started. +/// +/// Mirrors the `SchedulerStarter` pattern: create the channel before the module is +/// loaded (so the sender can be stored in `InstanceEnv`), then call [`IdcActorStarter::start`] +/// once the DB is ready. +pub struct IdcActorStarter { + rx: mpsc::UnboundedReceiver<()>, +} + +impl IdcActorStarter { + /// Spawn the IDC actor background task. + pub fn start(self, db: Arc, config: IdcActorConfig, module_host: WeakModuleHost) -> IdcActor { + let abort = tokio::spawn(run_idc_loop(db, config, module_host, self.rx)).abort_handle(); + IdcActor { _abort: abort } + } +} + +impl IdcActor { + /// Open the IDC channel, returning a starter and a sender. + /// + /// Store the sender in `ModuleCreationContext` so it reaches `InstanceEnv`. + /// After the module is ready, call [`IdcActorStarter::start`] to spawn the loop. + pub fn open() -> (IdcActorStarter, IdcActorSender) { + let (tx, rx) = mpsc::unbounded_channel(); + (IdcActorStarter { rx }, tx) + } +} + +/// All data needed to deliver a single outbound message, resolved from the outbox table. +#[derive(Clone)] +struct PendingMessage { + msg_id: u64, + /// Stored for future use (e.g. deleting the outbox row after delivery). + #[allow(dead_code)] + outbox_table_id: TableId, + /// Stored for future use (e.g. deleting the outbox row after delivery). + #[allow(dead_code)] + row_id: u64, + target_db_identity: Identity, + target_reducer: String, + args_bsatn: Vec, + request_row: ProductValue, + /// From the outbox table's `TableSchema::on_result_reducer`. + on_result_reducer: Option, +} + +/// Per-target-database delivery state. +struct DatabaseQueue { + queue: VecDeque, + /// When `Some`, this target is in backoff and should not be retried until this instant. + blocked_until: Option, + /// Current backoff duration for this target (doubles on each transport error). + backoff: Duration, +} + +impl DatabaseQueue { + fn new() -> Self { + Self { + queue: VecDeque::new(), + blocked_until: None, + backoff: INITIAL_BACKOFF, + } + } + + fn is_ready(&self) -> bool { + match self.blocked_until { + None => true, + Some(until) => Instant::now() >= until, + } + } + + fn record_transport_error(&mut self) { + self.blocked_until = Some(Instant::now() + self.backoff); + self.backoff = (self.backoff * 2).min(MAX_BACKOFF); + } + + fn record_success(&mut self) { + self.blocked_until = None; + self.backoff = INITIAL_BACKOFF; + } +} + +/// Outcome of a delivery attempt. +enum DeliveryOutcome { + /// Reducer succeeded (HTTP 200). + Success(Bytes), + /// Reducer ran but returned Err (HTTP 422). + ReducerError(String), + /// Budget exceeded (HTTP 402). + BudgetExceeded, + /// Transport error: database/reducer not found, network failure, unexpected HTTP status, etc. Caller should retry. + TransportError(String), +} + +/// Main IDC actor loop: maintain per-target queues and deliver messages. +async fn run_idc_loop( + db: Arc, + config: IdcActorConfig, + module_host: WeakModuleHost, + mut notify_rx: mpsc::UnboundedReceiver<()>, +) { + let client = reqwest::Client::builder() + .http2_prior_knowledge() + .http2_keep_alive_interval(Some(Duration::from_millis(200))) + .http2_keep_alive_timeout(Duration::from_secs(5)) + .http2_keep_alive_while_idle(true) + // Both stram and connection window sizes are 64KB by default, + // increasing the connection window to handle single stream blocking the entire connection. + .http2_initial_connection_window_size(Some(2 * 64 * 1024)) + .http2_initial_stream_window_size(Some(64 * 1024)) + .timeout(Duration::from_secs(5)) + .build() + .unwrap(); + + // Per-target-database delivery state. + let mut db_queues: HashMap = HashMap::new(); + + // On startup, load any pending messages that survived a restart. + load_pending_into_targets(&db, &mut db_queues); + + loop { + // One delivery attempt per ready target — fair round-robin across all targets. + // Each target gets exactly one shot per pass regardless of how busy it is, + // preventing a constantly-busy target from starving others. + let mut any_delivered = false; + for queue in db_queues.values_mut() { + if !queue.is_ready() { + continue; + } + let Some(msg) = queue.queue.front().cloned() else { + continue; + }; + + // If the outbox row was deleted before we could deliver, skip the + // HTTP call entirely. ST_OUTBOUND_MSG was already cleaned up by + // load_pending_into_targets when it detected the missing row. + if !outbox_row_exists(&db, msg.outbox_table_id, msg.row_id) { + log::debug!( + "idc_actor: outbox row already deleted for msg_id={}; skipping delivery", + msg.msg_id, + ); + queue.queue.pop_front(); + any_delivered = true; + continue; + } + + let outcome = attempt_delivery(&client, &config, &msg).await; + match outcome { + DeliveryOutcome::TransportError(reason) => { + log::warn!( + "idc_actor: transport error delivering msg_id={} to {}: {reason}", + msg.msg_id, + msg.target_db_identity.to_hex(), + ); + queue.record_transport_error(); + // Do NOT pop the front — keep retrying this message for this target. + } + outcome => { + queue.queue.pop_front(); + queue.record_success(); + any_delivered = true; + let (result_status, result_payload) = outcome_to_result(&outcome); + finalize_message(&db, &module_host, &msg, result_status, result_payload).await; + } + } + } + + // If any target made progress, skip sleeping and immediately run another pass + // so queues drain as fast as possible. + if any_delivered { + load_pending_into_targets(&db, &mut db_queues); + continue; + } + + // Compute how long to sleep: min over all blocked targets' unblock times. + let next_unblock = db_queues + .values() + .filter_map(|s| s.blocked_until) + .min() + .map(|t| t.saturating_duration_since(Instant::now())); + let sleep_duration = next_unblock.unwrap_or(POLL_INTERVAL).min(POLL_INTERVAL); + + // Wait for a notification or the next retry time. + tokio::select! { + _ = notify_rx.recv() => { + // Drain all pending notifications (coalesce bursts). + while notify_rx.try_recv().is_ok() {} + } + _ = tokio::time::sleep(sleep_duration) => {} + } + + // Reload pending messages from DB (catches new entries). + load_pending_into_targets(&db, &mut db_queues); + } +} + +/// Decode the delivery outcome into `(result_status, result_payload)` for recording. +fn outcome_to_result(outcome: &DeliveryOutcome) -> (StInboundMsgResultStatus, Bytes) { + match outcome { + DeliveryOutcome::Success(payload) => (StInboundMsgResultStatus::Success, payload.clone()), + DeliveryOutcome::ReducerError(msg) => (StInboundMsgResultStatus::ReducerError, Bytes::from(msg.clone())), + DeliveryOutcome::BudgetExceeded => ( + StInboundMsgResultStatus::ReducerError, + Bytes::from("budget exceeded".to_string()), + ), + DeliveryOutcome::TransportError(_) => unreachable!("transport errors never finalize"), + } +} + +pub(crate) type ReducerSuccessAction = Box) -> anyhow::Result<()> + Send>; + +#[derive(Debug, Clone, Copy)] +pub enum ReducerSuccessActionKind { + DeleteOutboundMsg(u64), +} + +fn reducer_workload(module: &ModuleInfo, params: &CallReducerParams) -> Workload { + let reducer_def = module.module_def.reducer_by_id(params.reducer_id); + Workload::Reducer(ReducerContext { + name: reducer_def.name.clone(), + caller_identity: params.caller_identity, + caller_connection_id: params.caller_connection_id, + timestamp: params.timestamp, + arg_bsatn: params.args.get_bsatn().clone(), + }) +} + +fn duplicate_result_from_st_inbound_row( + row: spacetimedb_datastore::system_tables::StInboundMsgRow, +) -> ReducerCallResult { + let outcome = match row.result_status { + StInboundMsgResultStatus::Success => ReducerOutcome::Committed, + StInboundMsgResultStatus::ReducerError => ReducerOutcome::Failed(Box::new( + String::from_utf8_lossy(&row.result_payload) + .into_owned() + .into_boxed_str(), + )), + }; + + ReducerCallResult { + outcome, + reducer_return_value: (row.result_status == StInboundMsgResultStatus::Success).then_some(row.result_payload), + energy_used: EnergyQuanta::ZERO, + execution_duration: Duration::ZERO, + tx_offset: None, + } +} + +fn record_failed_inbound_result(db: &RelationalDB, sender_identity: Identity, sender_msg_id: u64, error: &str) { + let mut dedup_tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + if let Err(e) = dedup_tx.upsert_inbound_last_msg( + sender_identity, + sender_msg_id, + StInboundMsgResultStatus::ReducerError, + error.to_string().into(), + ) { + log::error!("IDC: failed to record reducer error in dedup table for sender {sender_identity}: {e}"); + let _ = db.rollback_mut_tx(dedup_tx); + } else if let Err(e) = db.commit_tx(dedup_tx) { + log::error!("IDC: failed to commit dedup error record for sender {sender_identity}: {e}"); + } +} + +pub(crate) fn call_reducer_from_database( + module: &ModuleInfo, + db: &RelationalDB, + params: CallReducerParams, + sender_identity: Identity, + sender_msg_id: u64, + call_reducer: F, +) -> Result<(ReducerCallResult, bool), ReducerCallError> +where + F: FnOnce(Option, CallReducerParams, ReducerSuccessAction) -> (ReducerCallResult, bool), +{ + let tx = db.begin_mut_tx(IsolationLevel::Serializable, reducer_workload(module, ¶ms)); + if let Some(row) = tx.get_inbound_msg_row(sender_identity) { + if sender_msg_id == row.last_outbound_msg { + let _ = db.rollback_mut_tx(tx); + return Ok((duplicate_result_from_st_inbound_row(row), false)); + } + if sender_msg_id < row.last_outbound_msg { + let expected = row.last_outbound_msg + 1; + let _ = db.rollback_mut_tx(tx); + return Err(ReducerCallError::OutOfOrderInboundMessage { + sender: sender_identity, + expected, + received: sender_msg_id, + }); + } + } + + let (result, trapped) = call_reducer( + Some(tx), + params, + Box::new(move |tx, reducer_return_value| { + tx.upsert_inbound_last_msg( + sender_identity, + sender_msg_id, + StInboundMsgResultStatus::Success, + reducer_return_value.clone().unwrap_or_default(), + ) + .map_err(anyhow::Error::from) + }), + ); + + if let ReducerOutcome::Failed(err) = &result.outcome { + record_failed_inbound_result(db, sender_identity, sender_msg_id, err); + } + + Ok((result, trapped)) +} + +pub(crate) fn call_reducer_with_success_action( + module: &ModuleInfo, + db: &RelationalDB, + params: CallReducerParams, + action: ReducerSuccessActionKind, + call_reducer: F, +) -> (ReducerCallResult, bool) +where + F: FnOnce(Option, CallReducerParams, ReducerSuccessAction) -> (ReducerCallResult, bool), +{ + let tx = db.begin_mut_tx(IsolationLevel::Serializable, reducer_workload(module, ¶ms)); + call_reducer( + Some(tx), + params, + Box::new(move |tx, _reducer_return_value| match action { + ReducerSuccessActionKind::DeleteOutboundMsg(msg_id) => { + tx.delete_outbound_msg(msg_id).map_err(|e| anyhow!(e)) + } + }), + ) +} + +/// Finalize a delivered message: call the on_result reducer (if any), then delete from ST_OUTBOUND_MSG. +/// +/// On the happy path, `on_result_reducer` success and deletion of `st_outbound_msg` +/// are committed atomically in the same reducer transaction. +async fn finalize_message( + db: &RelationalDB, + module_host: &WeakModuleHost, + msg: &PendingMessage, + result_status: StInboundMsgResultStatus, + result_payload: Bytes, +) { + // If there is no on_result_reducer, just delete the outbound message and return. + let Some(on_result_reducer) = &msg.on_result_reducer else { + delete_message(db, msg.msg_id); + return; + }; + + // Check whether the outbox row still exists. The user may have deleted it between + // the time it was queued and now; in that case skip the callback — there is nothing + // to report back to and the ST_OUTBOUND_MSG entry was already cleaned up by + // `load_pending_into_targets`. + if !outbox_row_exists(db, msg.outbox_table_id, msg.row_id) { + log::debug!( + "idc_actor: outbox row already deleted for msg_id={}; skipping on_result reducer '{}'", + msg.msg_id, + on_result_reducer, + ); + // ST_OUTBOUND_MSG was already deleted by load_pending_into_targets when it + // detected the missing row, so there is nothing left to clean up here. + return; + } + + let Some(host) = module_host.upgrade() else { + log::warn!( + "idc_actor: module host gone, cannot call on_result reducer '{}' for msg_id={}", + on_result_reducer, + msg.msg_id, + ); + delete_message(db, msg.msg_id); + return; + }; + + let mut args_bytes = Vec::new(); + if let Err(e) = spacetimedb_sats::bsatn::to_writer(&mut args_bytes, &msg.request_row) { + log::error!( + "idc_actor: failed to encode on_result args for msg_id={}: {e}", + msg.msg_id + ); + delete_message(db, msg.msg_id); + return; + } + + let result_arg: Result<(), String> = match result_status { + StInboundMsgResultStatus::Success => Ok(()), + StInboundMsgResultStatus::ReducerError => Err(String::from_utf8_lossy(&result_payload).into_owned()), + }; + if let Err(e) = spacetimedb_sats::bsatn::to_writer(&mut args_bytes, &result_arg) { + log::error!( + "idc_actor: failed to encode on_result result arg for msg_id={}: {e}", + msg.msg_id + ); + delete_message(db, msg.msg_id); + return; + } + + let caller_identity = Identity::ZERO; // system call + let result = host + .call_reducer_with_success_action( + caller_identity, + None, // no connection_id + None, // no client sender + None, // no request_id + None, // no timer + on_result_reducer, + FunctionArgs::Bsatn(bytes::Bytes::from(args_bytes)), + ReducerSuccessActionKind::DeleteOutboundMsg(msg.msg_id), + ) + .await; + + match result { + Ok(_) => { + log::debug!( + "idc_actor: on_result reducer '{}' called for msg_id={}", + on_result_reducer, + msg.msg_id, + ); + } + Err(e) => { + delete_message(db, msg.msg_id); + log::error!( + "idc_actor: on_result reducer '{}' failed for msg_id={}: {e:?}", + on_result_reducer, + msg.msg_id, + ); + } + } +} + +/// Load all messages from ST_OUTBOUND_MSG into the per-target queues, resolving delivery data +/// from the corresponding outbox table rows. +/// +/// A row's presence in ST_OUTBOUND_MSG means it has not yet been processed. +/// Messages already in a target's queue (by msg_id) are not re-added. +fn load_pending_into_targets(db: &RelationalDB, db_queues: &mut HashMap) { + let tx = db.begin_tx(Workload::Internal); + + let st_outbound_msg_rows: Vec = db + .iter(&tx, ST_OUTBOUND_MSG_ID) + .map(|iter| { + iter.filter_map(|row_ref| StOutboundMsgRow::try_from(row_ref).ok()) + .collect() + }) + .unwrap_or_else(|e| { + log::error!("idc_actor: failed to read st_outbound_msg: {e}"); + Vec::new() + }); + + let mut pending: Vec = Vec::with_capacity(st_outbound_msg_rows.len()); + + for st_row in st_outbound_msg_rows { + let outbox_table_id = TableId(st_row.outbox_table_id); + + // Read the outbox table schema for reducer name and on_result_reducer. + let schema = match db.schema_for_table(&tx, outbox_table_id) { + Ok(s) => s, + Err(e) => { + log::error!( + "idc_actor: cannot find schema for outbox table {:?} (msg_id={}): {e}", + outbox_table_id, + st_row.msg_id, + ); + continue; + } + }; + + let outbox_schema = match schema.outbox.as_ref() { + Some(o) => o, + None => { + log::error!( + "idc_actor: table {:?} (msg_id={}) is not an outbox table", + schema.table_name, + st_row.msg_id, + ); + continue; + } + }; + let target_reducer = outbox_schema.remote_reducer.to_string(); + let on_result_reducer = outbox_schema.on_result_reducer.as_ref().map(|id| id.to_string()); + + // Look up the outbox row by its auto-inc PK (col 0) to get target identity and args. + let outbox_row = db + .iter_by_col_eq(&tx, outbox_table_id, ColId(0), &AlgebraicValue::U64(st_row.row_id)) + .ok() + .and_then(|mut iter| iter.next()); + + let Some(outbox_row_ref) = outbox_row else { + // The outbox row was explicitly deleted by the user; clean up the + // now-orphaned ST_OUTBOUND_MSG entry so it is never retried. + log::warn!( + "idc_actor: outbox row not found in table {:?} for row_id={} (msg_id={}); \ + deleting orphaned ST_OUTBOUND_MSG entry", + outbox_table_id, + st_row.row_id, + st_row.msg_id, + ); + delete_message(db, st_row.msg_id); + continue; + }; + + let pv = outbox_row_ref.to_product_value(); + + // Col 1: target_db_identity stored as SATS `Identity`, + // i.e. the product wrapper `(__identity__: U256)`. + let target_db_identity: Identity = match pv.elements.get(1) { + Some(AlgebraicValue::Product(identity_pv)) if identity_pv.elements.len() == 1 => { + match &identity_pv.elements[0] { + AlgebraicValue::U256(u) => Identity::from_u256(**u), + other => { + log::error!( + "idc_actor: outbox row col 1 expected Identity inner U256, got {other:?} (msg_id={})", + st_row.msg_id, + ); + continue; + } + } + } + other => { + log::error!( + "idc_actor: outbox row col 1 expected Identity wrapper, got {other:?} (msg_id={})", + st_row.msg_id, + ); + continue; + } + }; + + // Cols 2+: args for the remote reducer. + let args_bsatn = pv.elements[2..].iter().fold(Vec::new(), |mut acc, elem| { + spacetimedb_sats::bsatn::to_writer(&mut acc, elem) + .expect("writing outbox row args to BSATN should never fail"); + acc + }); + + pending.push(PendingMessage { + msg_id: st_row.msg_id, + outbox_table_id, + row_id: st_row.row_id, + target_db_identity, + target_reducer, + args_bsatn, + request_row: pv, + on_result_reducer, + }); + } + + drop(tx); + + // Sort by msg_id ascending so delivery order is preserved. + pending.sort_by_key(|m| m.msg_id); + + for msg in pending { + let state = db_queues + .entry(msg.target_db_identity) + .or_insert_with(DatabaseQueue::new); + // Only add if not already in the queue (avoid duplicates after reload). + let already_queued = state.queue.iter().any(|m| m.msg_id == msg.msg_id); + if !already_queued { + state.queue.push_back(msg); + } + } +} + +/// Attempt a single HTTP delivery of a message. +async fn attempt_delivery(client: &reqwest::Client, config: &IdcActorConfig, msg: &PendingMessage) -> DeliveryOutcome { + let target_db_hex = msg.target_db_identity.to_hex(); + let sender_hex = config.sender_identity.to_hex(); + + let url = format!( + "http://localhost:{}/v1/database/{target_db_hex}/call-from-database/{}?sender_identity={sender_hex}&msg_id={}", + config.http_port, msg.target_reducer, msg.msg_id, + ); + + let result = client + .post(&url) + .header("Content-Type", "application/octet-stream") + .body(msg.args_bsatn.clone()) + .send() + .await; + + match result { + Err(e) => DeliveryOutcome::TransportError(e.to_string()), + Ok(resp) => { + let status = resp.status(); + if status.is_success() { + // HTTP 200: reducer committed successfully. + DeliveryOutcome::Success(resp.bytes().await.unwrap_or_default()) + } else if status.as_u16() == 422 { + // HTTP 422: reducer ran but returned Err(...). + let body = resp.text().await.unwrap_or_default(); + DeliveryOutcome::ReducerError(body) + } else if status.as_u16() == 402 { + // HTTP 402: budget exceeded. + DeliveryOutcome::BudgetExceeded + } else { + // Any other error (503, 404, etc.) is a transport error: retry. + DeliveryOutcome::TransportError(format!("HTTP {status}")) + } + } + } +} + +/// Returns `true` if the outbox row identified by `(table_id, row_id)` still exists. +fn outbox_row_exists(db: &RelationalDB, table_id: TableId, row_id: u64) -> bool { + let tx = db.begin_tx(Workload::Internal); + db.iter_by_col_eq(&tx, table_id, ColId(0), &AlgebraicValue::U64(row_id)) + .ok() + .and_then(|mut iter| iter.next()) + .is_some() +} + +/// Delete a message from ST_OUTBOUND_MSG within a new transaction. +fn delete_message(db: &RelationalDB, msg_id: u64) { + let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + if let Err(e) = tx.delete_outbound_msg(msg_id) { + log::error!("idc_actor: failed to delete msg_id={msg_id}: {e}"); + let _ = db.rollback_mut_tx(tx); + return; + } + if let Err(e) = db.commit_tx(tx) { + log::error!("idc_actor: failed to commit delete for msg_id={msg_id}: {e}"); + } +} diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 96b876a0368..0def781db40 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -1,3 +1,4 @@ +use super::idc_actor::IdcActorSender; use super::scheduler::{get_schedule_from_row, ScheduleError, Scheduler}; use crate::database_logger::{BacktraceFrame, BacktraceProvider, LogLevel, ModuleBacktrace, Record}; use crate::db::relational_db::{MutTx, RelationalDB}; @@ -40,6 +41,8 @@ use std::vec::IntoIter; pub struct InstanceEnv { pub replica_ctx: Arc, pub scheduler: Scheduler, + /// Sender to notify the IDC actor of new outbox rows. + pub idc_sender: IdcActorSender, pub tx: TxSlot, /// The timestamp the current function began running. pub start_time: Timestamp, @@ -224,10 +227,11 @@ impl ChunkedWriter { // Generic 'instance environment' delegated to from various host types. impl InstanceEnv { - pub fn new(replica_ctx: Arc, scheduler: Scheduler) -> Self { + pub fn new(replica_ctx: Arc, scheduler: Scheduler, idc_sender: IdcActorSender) -> Self { Self { replica_ctx, scheduler, + idc_sender, tx: TxSlot::default(), start_time: Timestamp::now(), start_instant: Instant::now(), @@ -385,6 +389,10 @@ impl InstanceEnv { self.schedule_row(stdb, tx, table_id, row_ptr)?; } + if insert_flags.is_outbox_table { + self.enqueue_outbox_row(stdb, tx, table_id, row_ptr)?; + } + // Note, we update the metric for bytes written after the insert. // This is to capture auto-inc columns. tx.metrics.bytes_written += buffer.len(); @@ -424,6 +432,48 @@ impl InstanceEnv { Ok(()) } + /// Enqueue an outbox row into ST_OUTBOUND_MSG atomically within the current transaction, + /// and notify the IDC actor so it delivers without waiting for the next poll cycle. + /// + /// Outbox tables have: + /// - Col 0: auto-inc primary key (u64) — stored as `row_id` in ST_OUTBOUND_MSG. + /// - Col 1: target database Identity (stored as U256). + /// - Remaining cols: args for the remote reducer. + /// + /// The `on_result_reducer` and delivery data are resolved at delivery time from the + /// outbox table's schema and row, so ST_OUTBOUND_MSG only stores the minimal reference. + fn enqueue_outbox_row( + &self, + _stdb: &RelationalDB, + tx: &mut MutTx, + table_id: TableId, + row_ptr: RowPointer, + ) -> Result<(), NodesError> { + use spacetimedb_datastore::locking_tx_datastore::state_view::StateView as _; + + let row_ref = tx.get(table_id, row_ptr).map_err(DBError::from)?.unwrap(); + let pv = row_ref.to_product_value(); + + // Col 0 is the auto-inc primary key — this is the row_id we store in ST_OUTBOUND_MSG. + let row_id = match pv.elements.first() { + Some(AlgebraicValue::U64(id)) => *id, + other => { + let schema = tx.schema_for_table(table_id).map_err(DBError::from)?; + return Err(NodesError::Internal(Box::new(DBError::Other(anyhow::anyhow!( + "outbox table {}: expected col 0 to be U64 (auto-inc PK), got {other:?}", + schema.table_name + ))))); + } + }; + + tx.insert_st_outbound_msg(table_id.0, row_id).map_err(DBError::from)?; + + // Wake the IDC actor immediately so it doesn't wait for the next poll cycle. + let _ = self.idc_sender.send(()); + + Ok(()) + } + pub fn update(&self, table_id: TableId, index_id: IndexId, buffer: &mut [u8]) -> Result { let stdb = self.relational_db(); let tx = &mut *self.get_tx()?; @@ -1380,7 +1430,11 @@ mod test { fn instance_env(db: Arc) -> Result<(InstanceEnv, tokio::runtime::Runtime)> { let (scheduler, _) = Scheduler::open(db.clone()); let (replica_context, runtime) = replica_ctx(db)?; - Ok((InstanceEnv::new(Arc::new(replica_context), scheduler), runtime)) + let (_, idc_sender) = crate::host::idc_actor::IdcActor::open(); + Ok(( + InstanceEnv::new(Arc::new(replica_context), scheduler, idc_sender), + runtime, + )) } /// An in-memory `RelationalDB` for testing. diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index ae5e1fcafc4..aadd38cf9a5 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -12,6 +12,7 @@ use spacetimedb_schema::def::ModuleDef; mod disk_storage; mod host_controller; +pub mod idc_actor; mod module_common; #[allow(clippy::too_many_arguments)] pub mod module_host; diff --git a/crates/core/src/host/module_common.rs b/crates/core/src/host/module_common.rs index bff864759f3..548922234d9 100644 --- a/crates/core/src/host/module_common.rs +++ b/crates/core/src/host/module_common.rs @@ -4,6 +4,7 @@ use crate::{ energy::EnergyMonitor, host::{ + idc_actor::IdcActorSender, module_host::ModuleInfo, wasm_common::{module_host_actor::DescribeError, DESCRIBE_MODULE_DUNDER}, Scheduler, @@ -37,7 +38,13 @@ pub fn build_common_module_from_raw( replica_ctx.subscriptions.clone(), ); - Ok(ModuleCommon::new(replica_ctx, mcc.scheduler, info, mcc.energy_monitor)) + Ok(ModuleCommon::new( + replica_ctx, + mcc.scheduler, + mcc.idc_sender, + info, + mcc.energy_monitor, + )) } /// Non-runtime-specific parts of a module. @@ -45,6 +52,7 @@ pub fn build_common_module_from_raw( pub(crate) struct ModuleCommon { replica_context: Arc, scheduler: Scheduler, + idc_sender: IdcActorSender, info: Arc, energy_monitor: Arc, } @@ -54,12 +62,14 @@ impl ModuleCommon { fn new( replica_context: Arc, scheduler: Scheduler, + idc_sender: IdcActorSender, info: Arc, energy_monitor: Arc, ) -> Self { Self { replica_context, scheduler, + idc_sender, info, energy_monitor, } @@ -89,6 +99,10 @@ impl ModuleCommon { pub fn scheduler(&self) -> &Scheduler { &self.scheduler } + + pub fn idc_sender(&self) -> IdcActorSender { + self.idc_sender.clone() + } } /// Runs the describer of modules in `run` and does some logging around it. diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 6ed0f0ec97c..eb6f0f2a3d3 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -946,6 +946,12 @@ pub enum ReducerCallError { ScheduleReducerNotFound, #[error("can't directly call special {0:?} lifecycle reducer")] LifecycleReducer(Lifecycle), + #[error("out-of-order inbound message id {received} from {sender}; expected {expected}")] + OutOfOrderInboundMessage { + sender: Identity, + expected: u64, + received: u64, + }, } #[derive(Debug, PartialEq, Eq)] @@ -1605,6 +1611,42 @@ impl ModuleHost { .await? } + async fn call_reducer_with_success_action_inner( + &self, + caller_identity: Identity, + caller_connection_id: Option, + client: Option>, + request_id: Option, + timer: Option, + reducer_id: ReducerId, + reducer_def: &ReducerDef, + args: FunctionArgs, + action: crate::host::idc_actor::ReducerSuccessActionKind, + ) -> Result { + let args = args + .into_tuple_for_def(&self.info.module_def, reducer_def) + .map_err(InvalidReducerArguments)?; + let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); + let call_reducer_params = CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + args, + }; + + self.call( + &reducer_def.name, + (call_reducer_params, action), + async |(p, action), inst| Ok(inst.call_reducer_with_success_action(p, action)), + async |(p, action), inst| inst.call_reducer_with_success_action(p, action).await, + ) + .await? + } + pub async fn call_reducer( &self, caller_identity: Identity, @@ -1655,6 +1697,123 @@ impl ModuleHost { res } + pub async fn call_reducer_with_success_action( + &self, + caller_identity: Identity, + caller_connection_id: Option, + client: Option>, + request_id: Option, + timer: Option, + reducer_name: &str, + args: FunctionArgs, + action: crate::host::idc_actor::ReducerSuccessActionKind, + ) -> Result { + let res = async { + let (reducer_id, reducer_def) = self + .info + .module_def + .reducer_full(reducer_name) + .ok_or(ReducerCallError::NoSuchReducer)?; + if let Some(lifecycle) = reducer_def.lifecycle { + return Err(ReducerCallError::LifecycleReducer(lifecycle)); + } + + if reducer_def.visibility.is_private() && !self.is_database_owner(caller_identity) { + return Err(ReducerCallError::NoSuchReducer); + } + + self.call_reducer_with_success_action_inner( + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + reducer_def, + args, + action, + ) + .await + } + .await; + + let log_message = match &res { + Err(ReducerCallError::NoSuchReducer) => Some(no_such_function_log_message("reducer", reducer_name)), + Err(ReducerCallError::Args(_)) => Some(args_error_log_message("reducer", reducer_name)), + _ => None, + }; + if let Some(log_message) = log_message { + self.inject_logs(LogLevel::Error, reducer_name, &log_message) + } + + res + } + + pub async fn call_reducer_from_database( + &self, + caller_identity: Identity, + caller_connection_id: Option, + client: Option>, + request_id: Option, + timer: Option, + reducer_name: &str, + args: FunctionArgs, + sender_database_identity: Identity, + sender_msg_id: u64, + ) -> Result { + let res = async { + let (reducer_id, reducer_def) = self + .info + .module_def + .reducer_full(reducer_name) + .ok_or(ReducerCallError::NoSuchReducer)?; + if let Some(lifecycle) = reducer_def.lifecycle { + return Err(ReducerCallError::LifecycleReducer(lifecycle)); + } + + if reducer_def.visibility.is_private() && !self.is_database_owner(caller_identity) { + return Err(ReducerCallError::NoSuchReducer); + } + + let call_reducer_params = Self::call_reducer_params( + &self.info, + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + reducer_def, + args, + )?; + + self.call( + &reducer_def.name, + (call_reducer_params, sender_database_identity, sender_msg_id), + async |(p, sender_database_identity, sender_msg_id), inst| { + inst.call_reducer_from_database(p, sender_database_identity, sender_msg_id) + }, + async |(p, sender_database_identity, sender_msg_id), inst| { + inst.call_reducer_from_database(p, sender_database_identity, sender_msg_id) + .await + }, + ) + .await? + } + .await; + + let log_message = match &res { + Err(ReducerCallError::NoSuchReducer) => Some(no_such_function_log_message("reducer", reducer_name)), + Err(ReducerCallError::Args(_)) => Some(args_error_log_message("reducer", reducer_name)), + _ => None, + }; + if let Some(log_message) = log_message { + self.inject_logs(LogLevel::Error, reducer_name, &log_message) + } + + res + } + pub async fn call_view_add_single_subscription( &self, sender: Arc, diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index d3b285e9f16..eac5ca47110 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -443,7 +443,7 @@ pub(super) async fn call_scheduled_function( // print their message and backtrace when they occur, so we don't need to do // anything with the error payload. let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - inst_common.call_reducer_with_tx(Some(tx), params, inst) + inst_common.call_reducer_with_tx(Some(tx), params, inst, |_tx, _ret| Ok(())) })); let reschedule = delete_scheduled_function_row(module_info, db, id, None, None, inst_common, inst); // Currently, we drop the return value from the function call. In the future, diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index cf096839d47..b697b582164 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -664,6 +664,20 @@ impl JsInstance { .unwrap_or_else(|_| panic!("worker should stay live while calling a reducer")) } + pub async fn call_reducer_with_success_action( + &self, + params: CallReducerParams, + action: crate::host::idc_actor::ReducerSuccessActionKind, + ) -> ReducerCallResult { + self.send_request(|reply_tx| JsWorkerRequest::CallReducerWithSuccessAction { + reply_tx, + params, + action, + }) + .await + .unwrap_or_else(|_| panic!("worker should stay live while calling a reducer")) + } + pub async fn clear_all_clients(&self) -> anyhow::Result<()> { self.send_request(JsWorkerRequest::ClearAllClients) .await @@ -763,6 +777,19 @@ enum JsWorkerRequest { reply_tx: JsReplyTx, params: CallReducerParams, }, + /// See [`JsInstance::call_reducer_from_database`]. + CallReducerFromDatabase { + reply_tx: JsReplyTx>, + params: CallReducerParams, + sender_identity: Identity, + sender_msg_id: u64, + }, + /// See [`JsInstance::call_reducer_with_success_action`]. + CallReducerWithSuccessAction { + reply_tx: JsReplyTx, + params: CallReducerParams, + action: crate::host::idc_actor::ReducerSuccessActionKind, + }, /// See [`JsInstance::call_view`]. CallView { reply_tx: JsReplyTx, @@ -1173,6 +1200,42 @@ impl JsInstanceLane { .map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("call_reducer"))) } + pub async fn call_reducer_from_database( + &self, + params: CallReducerParams, + sender_identity: Identity, + sender_msg_id: u64, + ) -> Result { + self.run_once("call_reducer", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::CallReducerFromDatabase { + reply_tx, + params, + sender_identity, + sender_msg_id, + }) + .await + }) + .await + .map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("call_reducer")))? + } + + pub async fn call_reducer_with_success_action( + &self, + params: CallReducerParams, + action: crate::host::idc_actor::ReducerSuccessActionKind, + ) -> Result { + self.run_once("call_reducer", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::CallReducerWithSuccessAction { + reply_tx, + params, + action, + }) + .await + }) + .await + .map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("call_reducer"))) + } + /// Clear all instance-lane client state exactly once. pub async fn clear_all_clients(&self) -> anyhow::Result<()> { self.run_once("clear_all_clients", |inst: JsInstance| async move { @@ -1353,11 +1416,11 @@ async fn spawn_instance_worker( scope_with_context!(let scope, &mut isolate, Context::new(scope, Default::default())); // Setup the instance environment. - let (replica_ctx, scheduler) = match &module_or_mcc { - Either::Left(module) => (module.replica_ctx(), module.scheduler()), - Either::Right(mcc) => (&mcc.replica_ctx, &mcc.scheduler), + let (replica_ctx, scheduler, idc_sender) = match &module_or_mcc { + Either::Left(module) => (module.replica_ctx(), module.scheduler(), module.idc_sender()), + Either::Right(mcc) => (&mcc.replica_ctx, &mcc.scheduler, mcc.idc_sender.clone()), }; - let instance_env = InstanceEnv::new(replica_ctx.clone(), scheduler.clone()); + let instance_env = InstanceEnv::new(replica_ctx.clone(), scheduler.clone(), idc_sender); scope.set_slot(JsInstanceEnv::new(instance_env)); catch_exception(scope, |scope| Ok(builtins::evaluate_builtins(scope)?)) @@ -1420,10 +1483,17 @@ async fn spawn_instance_worker( let mut requests_since_heap_check = 0u64; let mut last_heap_check_at = Instant::now(); for request in request_rx.iter() { + let info_for_idc = instance_common.info(); + let db_for_idc = inst.replica_ctx().relational_db().clone(); if let Some(metric) = &worker_queue_metric { metric.dec(); } - let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, &mut inst); + + let mut call_reducer_with_success = + |tx, params, on_success: crate::host::idc_actor::ReducerSuccessAction| { + instance_common.call_reducer_with_tx(tx, params, &mut inst, on_success) + }; + let mut call_reducer = |tx, params| call_reducer_with_success(tx, params, Box::new(|_tx, _ret| Ok(()))); let mut should_exit = false; core_pinner.pin_if_changed(); @@ -1447,6 +1517,50 @@ async fn spawn_instance_worker( send_worker_reply("call_reducer", reply_tx, res); should_exit = trapped; } + JsWorkerRequest::CallReducerFromDatabase { + reply_tx, + params, + sender_identity, + sender_msg_id, + } => { + let res = crate::host::idc_actor::call_reducer_from_database( + info_for_idc.as_ref(), + db_for_idc.as_ref(), + params, + sender_identity, + sender_msg_id, + |tx, params, on_success| call_reducer_with_success(tx, params, on_success), + ); + let (res, trapped) = match res { + Ok((rcr, trapped)) => (Ok(rcr), trapped), + Err(err) => (Err(err), false), + }; + if trapped { + worker_state_in_thread.mark_trapped(); + } + + send_worker_reply("call_reducer", reply_tx, res); + should_exit = trapped; + } + JsWorkerRequest::CallReducerWithSuccessAction { + reply_tx, + params, + action, + } => { + let (res, trapped) = crate::host::idc_actor::call_reducer_with_success_action( + info_for_idc.as_ref(), + db_for_idc.as_ref(), + params, + action, + |tx, params, on_success| call_reducer_with_success(tx, params, on_success), + ); + if trapped { + worker_state_in_thread.mark_trapped(); + } + + send_worker_reply("call_reducer", reply_tx, res); + should_exit = trapped; + } JsWorkerRequest::CallView { reply_tx, cmd } => { let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst); if trapped { diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index a711e0f18dc..10a327f0088 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,4 +1,5 @@ use super::instrumentation::CallTimes; + use super::*; use crate::client::ClientActorId; use crate::database_logger; @@ -375,7 +376,7 @@ impl WasmModuleHostActor { func_names }; let uninit_instance = module.instantiate_pre()?; - let instance_env = InstanceEnv::new(mcc.replica_ctx.clone(), mcc.scheduler.clone()); + let instance_env = InstanceEnv::new(mcc.replica_ctx.clone(), mcc.scheduler.clone(), mcc.idc_sender.clone()); let mut instance = uninit_instance.instantiate(instance_env, &func_names)?; let desc = instance.extract_descriptions()?; @@ -422,7 +423,11 @@ impl WasmModuleHostActor { pub fn create_instance(&self) -> WasmModuleInstance { let common = &self.common; - let env = InstanceEnv::new(common.replica_ctx().clone(), common.scheduler().clone()); + let env = InstanceEnv::new( + common.replica_ctx().clone(), + common.scheduler().clone(), + common.idc_sender(), + ); // this shouldn't fail, since we already called module.create_instance() // before and it didn't error, and ideally they should be deterministic let mut instance = self @@ -462,7 +467,49 @@ impl WasmModuleInstance { } pub fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult { - let (res, trapped) = self.call_reducer_with_tx(None, params); + let (res, trapped) = self.call_reducer_with_tx(None, params, |_tx, _ret| Ok(())); + self.trapped = trapped; + res + } + + pub fn call_reducer_from_database( + &mut self, + params: CallReducerParams, + sender_identity: Identity, + sender_msg_id: u64, + ) -> Result { + let info = self.common.info.clone(); + let db = self.instance.replica_ctx().relational_db().clone(); + let (res, trapped) = crate::callgrind_flag::invoke_allowing_callgrind(|| { + crate::host::idc_actor::call_reducer_from_database( + info.as_ref(), + db.as_ref(), + params, + sender_identity, + sender_msg_id, + |tx, params, on_success| self.call_reducer_with_tx(tx, params, on_success), + ) + })?; + self.trapped = trapped; + Ok(res) + } + + pub fn call_reducer_with_success_action( + &mut self, + params: CallReducerParams, + action: crate::host::idc_actor::ReducerSuccessActionKind, + ) -> ReducerCallResult { + let info = self.common.info.clone(); + let db = self.instance.replica_ctx().relational_db().clone(); + let (res, trapped) = crate::callgrind_flag::invoke_allowing_callgrind(|| { + crate::host::idc_actor::call_reducer_with_success_action( + info.as_ref(), + db.as_ref(), + params, + action, + |tx, params, on_success| self.call_reducer_with_tx(tx, params, on_success), + ) + }); self.trapped = trapped; res } @@ -477,7 +524,7 @@ impl WasmModuleInstance { caller_connection_id: ConnectionId, ) -> Result<(), ClientConnectedError> { let module = &self.common.info.clone(); - let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params); + let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params, |_tx, _ret| Ok(())); let mut trapped = false; let res = call_identity_connected(caller_auth, caller_connection_id, module, call_reducer, &mut trapped); self.trapped = trapped; @@ -490,7 +537,7 @@ impl WasmModuleInstance { caller_connection_id: ConnectionId, ) -> Result<(), ReducerCallError> { let module = &self.common.info.clone(); - let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params); + let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params, |_tx, _ret| Ok(())); let mut trapped = false; let res = ModuleHost::call_identity_disconnected_inner( caller_identity, @@ -505,7 +552,7 @@ impl WasmModuleInstance { pub fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> { let module = &self.common.info.clone(); - let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params); + let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params, |_tx, _ret| Ok(())); let mut trapped = false; let res = ModuleHost::disconnect_client_inner(client_id, module, call_reducer, &mut trapped); self.trapped = trapped; @@ -515,7 +562,7 @@ impl WasmModuleInstance { pub fn init_database(&mut self, program: Program) -> anyhow::Result> { let module_def = &self.common.info.clone().module_def; let replica_ctx = &self.instance.replica_ctx().clone(); - let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params); + let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params, |_tx, _ret| Ok(())); let (res, trapped) = init_database(replica_ctx, module_def, program, call_reducer); self.trapped = trapped; res @@ -539,9 +586,18 @@ impl WasmModuleInstance { impl WasmModuleInstance { #[tracing::instrument(level = "trace", skip_all)] - fn call_reducer_with_tx(&mut self, tx: Option, params: CallReducerParams) -> (ReducerCallResult, bool) { + fn call_reducer_with_tx( + &mut self, + tx: Option, + params: CallReducerParams, + on_success: F, + ) -> (ReducerCallResult, bool) + where + F: FnOnce(&mut MutTxId, &Option) -> anyhow::Result<()>, + { crate::callgrind_flag::invoke_allowing_callgrind(|| { - self.common.call_reducer_with_tx(tx, params, &mut self.instance) + self.common + .call_reducer_with_tx(tx, params, &mut self.instance, on_success) }) } @@ -805,12 +861,16 @@ impl InstanceCommon { /// /// The `bool` in the return type signifies whether there was an "outer error". /// For WASM, this should be interpreted as a trap occurring. - pub(crate) fn call_reducer_with_tx( + pub(crate) fn call_reducer_with_tx( &mut self, tx: Option, params: CallReducerParams, inst: &mut I, - ) -> (ReducerCallResult, bool) { + on_success: F, + ) -> (ReducerCallResult, bool) + where + F: FnOnce(&mut MutTxId, &Option) -> anyhow::Result<()>, + { let CallReducerParams { timestamp, caller_identity, @@ -824,7 +884,7 @@ impl InstanceCommon { let caller_connection_id_opt = (caller_connection_id != ConnectionId::ZERO).then_some(caller_connection_id); let replica_ctx = inst.replica_ctx(); - let stdb = replica_ctx.relational_db(); + let stdb = replica_ctx.relational_db().clone(); let info = self.info.clone(); let reducer_def = info.module_def.reducer_by_id(reducer_id); let reducer_name = &reducer_def.name; @@ -844,6 +904,7 @@ impl InstanceCommon { let workload = Workload::Reducer(ReducerContext::from(op.clone())); let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload)); + let mut tx_slot = inst.tx_slot(); let vm_metrics = self.vm_metrics.get_for_reducer_id(reducer_id); @@ -885,12 +946,16 @@ impl InstanceCommon { Ok(return_value) => { // If this is an OnDisconnect lifecycle event, remove the client from st_clients. // We handle OnConnect events before running the reducer. - let res = match reducer_def.lifecycle { + let lifecycle_res = match reducer_def.lifecycle { Some(Lifecycle::OnDisconnect) => { tx.delete_st_client(caller_identity, caller_connection_id, info.database_identity) } _ => Ok(()), }; + let res = lifecycle_res + .map_err(anyhow::Error::from) + // Call `on_success`, when about to commit + .and_then(|_| on_success(&mut tx, &return_value)); match res { Ok(()) => (EventStatus::Committed(DatabaseUpdate::default()), return_value), Err(err) => { @@ -948,12 +1013,16 @@ impl InstanceCommon { request_id, timer, }; - let event = commit_and_broadcast_event(&info.subscriptions, client, event, out.tx).event; + let committed = commit_and_broadcast_event(&info.subscriptions, client, event, out.tx); + let tx_offset = committed.tx_offset; + let event = committed.event; let res = ReducerCallResult { outcome: ReducerOutcome::from(&event.status), + reducer_return_value: event.reducer_return_value.clone(), energy_used: energy_quanta_used, execution_duration: total_duration, + tx_offset: Some(tx_offset), }; (res, trapped) diff --git a/crates/core/src/module_host_context.rs b/crates/core/src/module_host_context.rs index 50f8c258a37..cd8293fb78c 100644 --- a/crates/core/src/module_host_context.rs +++ b/crates/core/src/module_host_context.rs @@ -1,4 +1,5 @@ use crate::energy::EnergyMonitor; +use crate::host::idc_actor::IdcActorSender; use crate::host::scheduler::Scheduler; use crate::replica_context::ReplicaContext; use spacetimedb_sats::hash::Hash; @@ -7,6 +8,7 @@ use std::sync::Arc; pub struct ModuleCreationContext { pub replica_ctx: Arc, pub scheduler: Scheduler, + pub idc_sender: IdcActorSender, pub program_hash: Hash, pub energy_monitor: Arc, } diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 476a73a9a45..47eb67a02f2 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -340,6 +340,7 @@ pub(crate) mod tests { None, false, None, + None, ), )?; let schema = db.schema_for_table_mut(tx, table_id)?; diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 3472fae82d5..7cc460364a5 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -19,7 +19,7 @@ use crate::{ is_built_in_meta_row, system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, StFields, StIndexRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, - ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX, + ST_CONSTRAINT_NAME, ST_INBOUND_MSG_ID, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX, ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX, @@ -30,9 +30,10 @@ use crate::{ locking_tx_datastore::ViewCallInfo, system_tables::{ ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_IDX, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, - ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_IDX, ST_TABLE_ACCESSOR_ID, - ST_TABLE_ACCESSOR_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, - ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX, + ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, ST_INBOUND_MSG_IDX, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_IDX, + ST_OUTBOUND_MSG_ID, ST_OUTBOUND_MSG_IDX, ST_TABLE_ACCESSOR_ID, ST_TABLE_ACCESSOR_IDX, ST_VIEW_COLUMN_ID, + ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, + ST_VIEW_SUB_IDX, }, }; use anyhow::anyhow; @@ -476,6 +477,8 @@ impl CommittedState { self.create_table(ST_TABLE_ACCESSOR_ID, schemas[ST_TABLE_ACCESSOR_IDX].clone()); self.create_table(ST_INDEX_ACCESSOR_ID, schemas[ST_INDEX_ACCESSOR_IDX].clone()); self.create_table(ST_COLUMN_ACCESSOR_ID, schemas[ST_COLUMN_ACCESSOR_IDX].clone()); + self.create_table(ST_INBOUND_MSG_ID, schemas[ST_INBOUND_MSG_IDX].clone()); + self.create_table(ST_OUTBOUND_MSG_ID, schemas[ST_OUTBOUND_MSG_IDX].clone()); // Insert the sequences into `st_sequences` let (st_sequences, blob_store, pool) = diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 336476fa6e6..23dd3a19cf7 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -1306,16 +1306,17 @@ mod tests { use crate::locking_tx_datastore::tx_state::PendingSchemaChange; use crate::system_tables::{ system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields, - StConstraintRow, StEventTableFields, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, - StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, - ST_CLIENT_ID, ST_CLIENT_NAME, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, - ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, - ST_EVENT_TABLE_ID, ST_EVENT_TABLE_NAME, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_NAME, ST_INDEX_ID, - ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID, - ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME, - ST_TABLE_ACCESSOR_ID, ST_TABLE_ACCESSOR_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, - ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, - ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID, ST_VIEW_SUB_NAME, + StConstraintRow, StEventTableFields, StInboundMsgFields, StIndexAlgorithm, StIndexFields, StIndexRow, + StOutboundMsgFields, StRowLevelSecurityFields, StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, + StVarFields, StViewArgFields, StViewFields, ST_CLIENT_ID, ST_CLIENT_NAME, ST_COLUMN_ACCESSOR_ID, + ST_COLUMN_ACCESSOR_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, + ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_NAME, + ST_INBOUND_MSG_ID, ST_INBOUND_MSG_NAME, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_NAME, ST_INDEX_ID, + ST_INDEX_NAME, ST_MODULE_NAME, ST_OUTBOUND_MSG_ID, ST_OUTBOUND_MSG_NAME, ST_RESERVED_SEQUENCE_RANGE, + ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, + ST_SEQUENCE_NAME, ST_TABLE_ACCESSOR_ID, ST_TABLE_ACCESSOR_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, + ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, + ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID, ST_VIEW_SUB_NAME, }; use crate::traits::{IsolationLevel, MutTx}; use crate::Result; @@ -1648,6 +1649,7 @@ mod tests { pk, false, None, + None, ) } @@ -1784,6 +1786,8 @@ mod tests { TableRow { id: ST_TABLE_ACCESSOR_ID.into(), name: ST_TABLE_ACCESSOR_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, TableRow { id: ST_INDEX_ACCESSOR_ID.into(), name: ST_INDEX_ACCESSOR_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, TableRow { id: ST_COLUMN_ACCESSOR_ID.into(), name: ST_COLUMN_ACCESSOR_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, + TableRow { id: ST_INBOUND_MSG_ID.into(), name: ST_INBOUND_MSG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StInboundMsgFields::DatabaseIdentity.into()) }, + TableRow { id: ST_OUTBOUND_MSG_ID.into(), name: ST_OUTBOUND_MSG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StOutboundMsgFields::MsgId.into()) }, ])); #[rustfmt::skip] @@ -1881,6 +1885,15 @@ mod tests { ColRow { table: ST_COLUMN_ACCESSOR_ID.into(), pos: 0, name: "table_name", ty: AlgebraicType::String }, ColRow { table: ST_COLUMN_ACCESSOR_ID.into(), pos: 1, name: "col_name", ty: AlgebraicType::String }, ColRow { table: ST_COLUMN_ACCESSOR_ID.into(), pos: 2, name: "accessor_name", ty: AlgebraicType::String }, + + ColRow { table: ST_INBOUND_MSG_ID.into(), pos: 0, name: "database_identity", ty: AlgebraicType::U256 }, + ColRow { table: ST_INBOUND_MSG_ID.into(), pos: 1, name: "last_outbound_msg", ty: AlgebraicType::U64 }, + ColRow { table: ST_INBOUND_MSG_ID.into(), pos: 2, name: "result_status", ty: AlgebraicType::U8 }, + ColRow { table: ST_INBOUND_MSG_ID.into(), pos: 3, name: "result_payload", ty: AlgebraicType::bytes() }, + + ColRow { table: ST_OUTBOUND_MSG_ID.into(), pos: 0, name: "msg_id", ty: AlgebraicType::U64 }, + ColRow { table: ST_OUTBOUND_MSG_ID.into(), pos: 1, name: "outbox_table_id", ty: TableId::get_type() }, + ColRow { table: ST_OUTBOUND_MSG_ID.into(), pos: 2, name: "row_id", ty: AlgebraicType::U64 }, ])); #[rustfmt::skip] assert_eq!(query.scan_st_indexes()?, map_array([ @@ -1913,6 +1926,8 @@ mod tests { IndexRow { id: 27, table: ST_INDEX_ACCESSOR_ID.into(), col: col(1), name: "st_index_accessor_accessor_name_idx_btree", }, IndexRow { id: 28, table: ST_COLUMN_ACCESSOR_ID.into(), col: col_list![0, 1], name: "st_column_accessor_table_name_col_name_idx_btree", }, IndexRow { id: 29, table: ST_COLUMN_ACCESSOR_ID.into(), col: col_list![0, 2], name: "st_column_accessor_table_name_accessor_name_idx_btree", }, + IndexRow { id: 30, table: ST_INBOUND_MSG_ID.into(), col: col(0), name: "st_inbound_msg_database_identity_idx_btree", }, + IndexRow { id: 31, table: ST_OUTBOUND_MSG_ID.into(), col: col(0), name: "st_outbound_msg_msg_id_idx_btree", }, ])); let start = ST_RESERVED_SEQUENCE_RANGE as i128 + 1; #[rustfmt::skip] @@ -1925,6 +1940,7 @@ mod tests { SequenceRow { id: 4, table: ST_SCHEDULED_ID.into(), col_pos: 0, name: "st_scheduled_schedule_id_seq", start }, SequenceRow { id: 6, table: ST_VIEW_ID.into(), col_pos: 0, name: "st_view_view_id_seq", start }, SequenceRow { id: 7, table: ST_VIEW_ARG_ID.into(), col_pos: 0, name: "st_view_arg_id_seq", start }, + SequenceRow { id: 8, table: ST_OUTBOUND_MSG_ID.into(), col_pos: 0, name: "st_outbound_msg_msg_id_seq", start }, ], |row| StSequenceRow { allocated: start - 1, @@ -1958,6 +1974,8 @@ mod tests { ConstraintRow { constraint_id: 23, table_id: ST_INDEX_ACCESSOR_ID.into(), unique_columns: col(1), constraint_name: "st_index_accessor_accessor_name_key", }, ConstraintRow { constraint_id: 24, table_id: ST_COLUMN_ACCESSOR_ID.into(), unique_columns: col_list![0, 1], constraint_name: "st_column_accessor_table_name_col_name_key", }, ConstraintRow { constraint_id: 25, table_id: ST_COLUMN_ACCESSOR_ID.into(), unique_columns: col_list![0, 2], constraint_name: "st_column_accessor_table_name_accessor_name_key", }, + ConstraintRow { constraint_id: 26, table_id: ST_INBOUND_MSG_ID.into(), unique_columns: col(0), constraint_name: "st_inbound_msg_database_identity_key", }, + ConstraintRow { constraint_id: 27, table_id: ST_OUTBOUND_MSG_ID.into(), unique_columns: col(0), constraint_name: "st_outbound_msg_msg_id_key", }, ])); // Verify we get back the tables correctly with the proper ids... @@ -2391,6 +2409,8 @@ mod tests { IndexRow { id: 27, table: ST_INDEX_ACCESSOR_ID.into(), col: col(1), name: "st_index_accessor_accessor_name_idx_btree", }, IndexRow { id: 28, table: ST_COLUMN_ACCESSOR_ID.into(), col: col_list![0, 1], name: "st_column_accessor_table_name_col_name_idx_btree", }, IndexRow { id: 29, table: ST_COLUMN_ACCESSOR_ID.into(), col: col_list![0, 2], name: "st_column_accessor_table_name_accessor_name_idx_btree", }, + IndexRow { id: 30, table: ST_INBOUND_MSG_ID.into(), col: col(0), name: "st_inbound_msg_database_identity_idx_btree", }, + IndexRow { id: 31, table: ST_OUTBOUND_MSG_ID.into(), col: col(0), name: "st_outbound_msg_msg_id_idx_btree", }, IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", }, IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", }, IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", }, diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 6e817fbdccd..c7ae602f48f 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -20,12 +20,13 @@ use crate::{ error::{IndexError, SequenceError, TableError}, system_tables::{ with_sys_table_buf, StClientFields, StClientRow, StColumnAccessorFields, StColumnAccessorRow, StColumnFields, - StColumnRow, StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StIndexAccessorFields, - StIndexAccessorRow, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, - StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow, - StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, - ST_EVENT_TABLE_ID, ST_INDEX_ACCESSOR_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, - ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID, + StColumnRow, StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StInboundMsgFields, + StInboundMsgRow, StIndexAccessorFields, StIndexAccessorRow, StIndexFields, StIndexRow, StOutboundMsgFields, + StOutboundMsgRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow, + StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow, StTableFields, StTableRow, + SystemTable, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, + ST_INBOUND_MSG_ID, ST_INDEX_ACCESSOR_ID, ST_INDEX_ID, ST_OUTBOUND_MSG_ID, ST_ROW_LEVEL_SECURITY_ID, + ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID, }, }; use crate::{execution_context::ExecutionContext, system_tables::StViewColumnRow}; @@ -34,6 +35,7 @@ use crate::{ locking_tx_datastore::state_view::ScanOrIndex, traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags}, }; +use bytes::Bytes; use core::{cell::RefCell, iter, mem, ops::RangeBounds}; use itertools::Either; use smallvec::SmallVec; @@ -2638,6 +2640,90 @@ impl MutTxId { .map(|row| row.pointer()) } + /// Look up the inbound dedup record for `sender_identity` in `st_inbound_msg`. + /// + /// Returns `None` if no entry exists for this sender (i.e., no message has been delivered yet). + pub fn get_inbound_msg_row(&self, sender_identity: Identity) -> Option { + self.iter_by_col_eq( + ST_INBOUND_MSG_ID, + StInboundMsgFields::DatabaseIdentity.col_id(), + &IdentityViaU256::from(sender_identity).into(), + ) + .expect("failed to read from st_inbound_msg system table") + .next() + .and_then(|row_ref| StInboundMsgRow::try_from(row_ref).ok()) + } + + /// Update the last delivered msg_id for `sender_identity` in `st_inbound_msg`. + /// + /// If an entry already exists, it is replaced; otherwise a new entry is inserted. + /// `result_status` and `result_payload` store the outcome of the reducer call. + pub fn upsert_inbound_last_msg( + &mut self, + sender_identity: Identity, + last_outbound_msg: u64, + result_status: crate::system_tables::StInboundMsgResultStatus, + result_payload: Bytes, + ) -> Result<()> { + // Delete the existing row if present. + self.delete_col_eq( + ST_INBOUND_MSG_ID, + StInboundMsgFields::DatabaseIdentity.col_id(), + &IdentityViaU256::from(sender_identity).into(), + )?; + let row = StInboundMsgRow { + database_identity: sender_identity.into(), + last_outbound_msg, + result_status, + result_payload, + }; + self.insert_via_serialize_bsatn(ST_INBOUND_MSG_ID, &row) + .map(|_| ()) + .inspect_err(|e| { + log::error!( + "upsert_inbound_last_outbound_msg: failed to upsert last_outbound_msg for {sender_identity} to {last_outbound_msg}: {e}" + ); + }) + } + + /// Insert a new outbound inter-database message into `st_outbound_msg`. + pub fn insert_st_outbound_msg(&mut self, outbox_table_id: u32, row_id: u64) -> Result<()> { + let row = StOutboundMsgRow { + msg_id: 0, // auto-incremented by the sequence + outbox_table_id, + row_id, + }; + self.insert_via_serialize_bsatn(ST_OUTBOUND_MSG_ID, &row) + .map(|_| ()) + .inspect_err(|e| { + log::error!("insert_st_outbound_msg: failed to insert msg for outbox_table_id={outbox_table_id}: {e}"); + }) + } + + /// Retrieve all outbound messages from `st_outbound_msg`, ordered by msg_id ascending. + pub fn all_outbound_msgs(&self) -> Result> { + let mut rows: Vec = self + .iter(ST_OUTBOUND_MSG_ID) + .expect("failed to read from st_outbound_msg system table") + .filter_map(|row_ref| StOutboundMsgRow::try_from(row_ref).ok()) + .collect(); + rows.sort_by_key(|r| r.msg_id); + Ok(rows) + } + + /// Delete a message from `st_outbound_msg` once it has been fully processed. + pub fn delete_outbound_msg(&mut self, msg_id: u64) -> Result<()> { + self.delete_col_eq( + ST_OUTBOUND_MSG_ID, + StOutboundMsgFields::MsgId.col_id(), + &AlgebraicValue::U64(msg_id), + ) + .map(|_| ()) + .inspect_err(|e| { + log::error!("delete_msg_id: failed to delete msg_id={msg_id}: {e}"); + }) + } + pub fn insert_via_serialize_bsatn<'a, T: Serialize>( &'a mut self, table_id: TableId, @@ -2796,6 +2882,7 @@ pub(super) fn insert<'a, const GENERATE: bool>( let insert_flags = InsertFlags { is_scheduler_table: tx_table.is_scheduler(), + is_outbox_table: tx_table.is_outbox(), }; let ok = |row_ref| Ok((gen_cols, row_ref, insert_flags)); diff --git a/crates/datastore/src/locking_tx_datastore/state_view.rs b/crates/datastore/src/locking_tx_datastore/state_view.rs index 8c4c978aaa0..efb393c3630 100644 --- a/crates/datastore/src/locking_tx_datastore/state_view.rs +++ b/crates/datastore/src/locking_tx_datastore/state_view.rs @@ -268,6 +268,7 @@ pub trait StateView { table_primary_key, is_event, table_alias, + None, )) } diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index e75cc76b365..898574b5d51 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -11,6 +11,7 @@ //! - Use [`st_fields_enum`] to define its column enum. //! - Register its schema in [`system_module_def`], making sure to call `validate_system_table` at the end of the function. +use bytes::Bytes; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; use spacetimedb_lib::db::auth::{StAccess, StTableType}; use spacetimedb_lib::db::raw_def::v9::{btree, RawSql}; @@ -89,6 +90,12 @@ pub const ST_INDEX_ACCESSOR_ID: TableId = TableId(19); /// The static ID of the table that maps canonical column names to accessor names pub const ST_COLUMN_ACCESSOR_ID: TableId = TableId(20); +/// The static ID of the table that tracks the last inbound msg id per sender database. +pub const ST_INBOUND_MSG_ID: TableId = TableId(21); + +/// The static ID of the table that tracks outbound inter-database messages. +pub const ST_OUTBOUND_MSG_ID: TableId = TableId(22); + pub(crate) const ST_CONNECTION_CREDENTIALS_NAME: &str = "st_connection_credentials"; pub const ST_TABLE_NAME: &str = "st_table"; pub const ST_COLUMN_NAME: &str = "st_column"; @@ -109,6 +116,8 @@ pub(crate) const ST_EVENT_TABLE_NAME: &str = "st_event_table"; pub(crate) const ST_TABLE_ACCESSOR_NAME: &str = "st_table_accessor"; pub(crate) const ST_INDEX_ACCESSOR_NAME: &str = "st_index_accessor"; pub(crate) const ST_COLUMN_ACCESSOR_NAME: &str = "st_column_accessor"; +pub(crate) const ST_INBOUND_MSG_NAME: &str = "st_inbound_msg"; +pub(crate) const ST_OUTBOUND_MSG_NAME: &str = "st_outbound_msg"; /// Reserved range of sequence values used for system tables. /// /// Ids for user-created tables will start at `ST_RESERVED_SEQUENCE_RANGE`. @@ -182,7 +191,11 @@ pub fn is_built_in_meta_row(table_id: TableId, row: &ProductValue) -> Result false, + ST_TABLE_ACCESSOR_ID + | ST_INDEX_ACCESSOR_ID + | ST_COLUMN_ACCESSOR_ID + | ST_INBOUND_MSG_ID + | ST_OUTBOUND_MSG_ID => false, TableId(..ST_RESERVED_SEQUENCE_RANGE) => { log::warn!("Unknown system table {table_id:?}"); false @@ -205,7 +218,7 @@ pub enum SystemTable { st_table_accessor, } -pub fn system_tables() -> [TableSchema; 20] { +pub fn system_tables() -> [TableSchema; 22] { [ // The order should match the `id` of the system table, that start with [ST_TABLE_IDX]. st_table_schema(), @@ -228,6 +241,8 @@ pub fn system_tables() -> [TableSchema; 20] { st_table_accessor_schema(), st_index_accessor_schema(), st_column_accessor_schema(), + st_inbound_msg_schema(), + st_outbound_schema(), ] } @@ -276,6 +291,8 @@ pub(crate) const ST_EVENT_TABLE_IDX: usize = 16; pub(crate) const ST_TABLE_ACCESSOR_IDX: usize = 17; pub(crate) const ST_INDEX_ACCESSOR_IDX: usize = 18; pub(crate) const ST_COLUMN_ACCESSOR_IDX: usize = 19; +pub(crate) const ST_INBOUND_MSG_IDX: usize = 20; +pub(crate) const ST_OUTBOUND_MSG_IDX: usize = 21; macro_rules! st_fields_enum { ($(#[$attr:meta])* enum $ty_name:ident { $($name:expr, $var:ident = $discr:expr,)* }) => { @@ -450,6 +467,19 @@ st_fields_enum!(enum StColumnAccessorFields { "accessor_name", AccessorName = 2, }); +st_fields_enum!(enum StInboundMsgFields { + "database_identity", DatabaseIdentity = 0, + "last_outbound_msg", LastMsgId = 1, + "result_status", ResultStatus = 2, + "result_payload", ResultPayload = 3, +}); + +st_fields_enum!(enum StOutboundMsgFields { + "msg_id", MsgId = 0, + "outbox_table_id", OutboxTableId = 1, + "row_id", RowId = 2, +}); + /// Helper method to check that a system table has the correct fields. /// Does not check field types since those aren't included in `StFields` types. /// If anything in here is not true, the system is completely broken, so it's fine to assert. @@ -668,6 +698,27 @@ fn system_module_def() -> ModuleDef { .with_unique_constraint(st_column_accessor_table_alias_cols) .with_index_no_accessor_name(btree(st_column_accessor_table_alias_cols)); + let st_inbound_msg_type = builder.add_type::(); + builder + .build_table( + ST_INBOUND_MSG_NAME, + *st_inbound_msg_type.as_ref().expect("should be ref"), + ) + .with_type(TableType::System) + .with_unique_constraint(StInboundMsgFields::DatabaseIdentity) + .with_index_no_accessor_name(btree(StInboundMsgFields::DatabaseIdentity)) + .with_primary_key(StInboundMsgFields::DatabaseIdentity); + + let st_outbound_msg_type = builder.add_type::(); + builder + .build_table( + ST_OUTBOUND_MSG_NAME, + *st_outbound_msg_type.as_ref().expect("should be ref"), + ) + .with_type(TableType::System) + .with_auto_inc_primary_key(StOutboundMsgFields::MsgId) + .with_index_no_accessor_name(btree(StOutboundMsgFields::MsgId)); + let result = builder .finish() .try_into() @@ -693,6 +744,8 @@ fn system_module_def() -> ModuleDef { validate_system_table::(&result, ST_TABLE_ACCESSOR_NAME); validate_system_table::(&result, ST_INDEX_ACCESSOR_NAME); validate_system_table::(&result, ST_COLUMN_ACCESSOR_NAME); + validate_system_table::(&result, ST_INBOUND_MSG_NAME); + validate_system_table::(&result, ST_OUTBOUND_MSG_NAME); result } @@ -741,6 +794,8 @@ lazy_static::lazy_static! { m.insert("st_index_accessor_accessor_name_key", ConstraintId(23)); m.insert("st_column_accessor_table_name_col_name_key", ConstraintId(24)); m.insert("st_column_accessor_table_name_accessor_name_key", ConstraintId(25)); + m.insert("st_inbound_msg_database_identity_key", ConstraintId(26)); + m.insert("st_outbound_msg_msg_id_key", ConstraintId(27)); m }; } @@ -779,6 +834,8 @@ lazy_static::lazy_static! { m.insert("st_index_accessor_accessor_name_idx_btree", IndexId(27)); m.insert("st_column_accessor_table_name_col_name_idx_btree", IndexId(28)); m.insert("st_column_accessor_table_name_accessor_name_idx_btree", IndexId(29)); + m.insert("st_inbound_msg_database_identity_idx_btree", IndexId(30)); + m.insert("st_outbound_msg_msg_id_idx_btree", IndexId(31)); m }; } @@ -795,6 +852,7 @@ lazy_static::lazy_static! { m.insert("st_sequence_sequence_id_seq", SequenceId(5)); m.insert("st_view_view_id_seq", SequenceId(6)); m.insert("st_view_arg_id_seq", SequenceId(7)); + m.insert("st_outbound_msg_msg_id_seq", SequenceId(8)); m }; } @@ -940,6 +998,14 @@ fn st_column_accessor_schema() -> TableSchema { st_schema(ST_COLUMN_ACCESSOR_NAME, ST_COLUMN_ACCESSOR_ID) } +fn st_inbound_msg_schema() -> TableSchema { + st_schema(ST_INBOUND_MSG_NAME, ST_INBOUND_MSG_ID) +} + +fn st_outbound_schema() -> TableSchema { + st_schema(ST_OUTBOUND_MSG_NAME, ST_OUTBOUND_MSG_ID) +} + /// If `table_id` refers to a known system table, return its schema. /// /// Used when restoring from a snapshot; system tables are reinstantiated with this schema, @@ -968,6 +1034,8 @@ pub(crate) fn system_table_schema(table_id: TableId) -> Option { ST_TABLE_ACCESSOR_ID => Some(st_table_accessor_schema()), ST_INDEX_ACCESSOR_ID => Some(st_index_accessor_schema()), ST_COLUMN_ACCESSOR_ID => Some(st_column_accessor_schema()), + ST_INBOUND_MSG_ID => Some(st_inbound_msg_schema()), + ST_OUTBOUND_MSG_ID => Some(st_outbound_schema()), _ => None, } } @@ -1859,6 +1927,88 @@ impl From for ProductValue { } } +/// System Table [ST_INBOUND_MSG_NAME] +/// Tracks the last message id received from each sender database for deduplication. +/// Also stores the result of the reducer call for returning on subsequent duplicate requests. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum StInboundMsgResultStatus { + /// Reducer ran and committed successfully. + Success = 1, + /// Reducer ran and returned a user-visible error. + ReducerError = 2, +} + +impl TryFrom for StInboundMsgResultStatus { + type Error = &'static str; + + fn try_from(value: u8) -> Result { + match value { + 1 => Ok(Self::Success), + 2 => Ok(Self::ReducerError), + _ => Err("invalid st_inbound_msg result status"), + } + } +} + +impl From for u8 { + fn from(value: StInboundMsgResultStatus) -> Self { + value as u8 + } +} + +impl_st!([] StInboundMsgResultStatus, AlgebraicType::U8); +impl<'de> Deserialize<'de> for StInboundMsgResultStatus { + fn deserialize>(deserializer: D) -> Result { + let value = u8::deserialize(deserializer)?; + Self::try_from(value).map_err(D::Error::custom) + } +} +impl_serialize!([] StInboundMsgResultStatus, (self, ser) => u8::from(*self).serialize(ser)); + +#[derive(Debug, Clone, PartialEq, Eq, SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct StInboundMsgRow { + pub database_identity: IdentityViaU256, + pub last_outbound_msg: u64, + pub result_status: StInboundMsgResultStatus, + /// Reducer return payload encoded as raw bytes. + /// For `SUCCESS`, this stores the committed reducer return value. + /// For `REDUCER_ERROR`, this stores the error message bytes. + pub result_payload: Bytes, +} + +impl TryFrom> for StInboundMsgRow { + type Error = DatastoreError; + fn try_from(row: RowRef<'_>) -> Result { + read_via_bsatn(row) + } +} + +/// System Table [ST_OUTBOUND_MSG_NAME] +/// Tracks undelivered outbound inter-database messages (outbox pattern). +/// A row's presence means the message has not yet been fully processed. +/// Once delivery and the on_result callback complete, the row is deleted. +/// +/// Delivery data (target identity, reducer name, args) is read from the outbox table row. +/// The on_result_reducer is read from the outbox table's schema (TableSchema::on_result_reducer). +#[derive(Debug, Clone, PartialEq, Eq, SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct StOutboundMsgRow { + pub msg_id: u64, + /// The TableId of the outbox table that generated this message. + pub outbox_table_id: u32, + /// The auto-inc primary key (col 0) of the row in the outbox table. + pub row_id: u64, +} + +impl TryFrom> for StOutboundMsgRow { + type Error = DatastoreError; + fn try_from(row: RowRef<'_>) -> Result { + read_via_bsatn(row) + } +} + thread_local! { static READ_BUF: RefCell> = const { RefCell::new(Vec::new()) }; } diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index e1b99825ae2..fcf7a8caf6a 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -531,6 +531,8 @@ impl Program { pub struct InsertFlags { /// Is the table a scheduler table? pub is_scheduler_table: bool, + /// Is the table an outbox table? + pub is_outbox_table: bool, } /// Additional information about an update operation. diff --git a/crates/lib/src/db/raw_def/v10.rs b/crates/lib/src/db/raw_def/v10.rs index a801ea286be..2dc0b47946e 100644 --- a/crates/lib/src/db/raw_def/v10.rs +++ b/crates/lib/src/db/raw_def/v10.rs @@ -89,6 +89,12 @@ pub enum RawModuleDefV10Section { /// Names provided explicitly by the user that do not follow from the case conversion policy. ExplicitNames(ExplicitNames), + + /// Outbox table definitions. + /// + /// Each entry marks a table as an outbox table, + /// specifying the remote reducer to call and optionally a local callback reducer. + Outboxes(Vec), } #[derive(Debug, Clone, Copy, Default, SpacetimeType)] @@ -264,6 +270,31 @@ pub struct RawColumnDefaultValueV10 { pub value: Box<[u8]>, } +/// Marks a table as an outbox table. +/// +/// The table must have: +/// - Col 0: `u64` with `#[primary_key] #[auto_inc]` — the row ID stored in `st_outbound_msg`. +/// - Col 1: `Identity` — the target database identity. +/// - Remaining cols: arguments forwarded verbatim to the remote reducer. +/// +/// The `remote_reducer` is the name of the reducer to call on the target database. +/// If `on_result_reducer` is set, that local reducer is called when delivery completes, +/// with signature `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result)`. +#[derive(Debug, Clone, SpacetimeType)] +#[sats(crate = crate)] +#[cfg_attr(feature = "test", derive(PartialEq, Eq, PartialOrd, Ord))] +pub struct RawOutboxDefV10 { + /// The `source_name` of the outbox table (as given in `accessor = ...`). + pub table_name: RawIdentifier, + + /// The name of the reducer to call on the target database. + pub remote_reducer: RawIdentifier, + + /// The name of the local reducer to call with the delivery result. + /// If `None`, no callback is made after delivery. + pub on_result_reducer: Option, +} + /// A reducer definition. #[derive(Debug, Clone, SpacetimeType)] #[sats(crate = crate)] @@ -584,6 +615,14 @@ impl RawModuleDefV10 { .expect("Tables section must exist for tests") } + /// Get the outboxes section, if present. + pub fn outboxes(&self) -> Option<&Vec> { + self.sections.iter().find_map(|s| match s { + RawModuleDefV10Section::Outboxes(outboxes) => Some(outboxes), + _ => None, + }) + } + // Get the row-level security section, if present. pub fn row_level_security(&self) -> Option<&Vec> { self.sections.iter().find_map(|s| match s { @@ -785,6 +824,24 @@ impl RawModuleDefV10Builder { } } + /// Get mutable access to the outboxes section, creating it if missing. + fn outboxes_mut(&mut self) -> &mut Vec { + let idx = self + .module + .sections + .iter() + .position(|s| matches!(s, RawModuleDefV10Section::Outboxes(_))) + .unwrap_or_else(|| { + self.module.sections.push(RawModuleDefV10Section::Outboxes(Vec::new())); + self.module.sections.len() - 1 + }); + + match &mut self.module.sections[idx] { + RawModuleDefV10Section::Outboxes(outboxes) => outboxes, + _ => unreachable!("Just ensured Outboxes section exists"), + } + } + /// Get mutable access to the case conversion policy, creating it if missing. fn explicit_names_mut(&mut self) -> &mut ExplicitNames { let idx = self @@ -1040,6 +1097,24 @@ impl RawModuleDefV10Builder { }); } + /// Register an outbox table. + /// + /// `table_name` is the `source_name` of the table (i.e. `accessor =` value). + /// `remote_reducer` is the reducer to call on the target database. + /// `on_result_reducer` is an optional local reducer called with the delivery result. + pub fn add_outbox( + &mut self, + table_name: impl Into, + remote_reducer: impl Into, + on_result_reducer: Option>, + ) { + self.outboxes_mut().push(RawOutboxDefV10 { + table_name: table_name.into(), + remote_reducer: remote_reducer.into(), + on_result_reducer: on_result_reducer.map(Into::into), + }); + } + /// Add a row-level security policy to the module. /// /// The `sql` expression should be a valid SQL expression that will be used to filter rows. diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index 0db3e7910ce..53487aa457b 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -1547,6 +1547,7 @@ mod tests { primary_key.map(ColId::from), false, None, + None, ))) } diff --git a/crates/schema/src/def.rs b/crates/schema/src/def.rs index 89c201e3f85..ce4aee643c0 100644 --- a/crates/schema/src/def.rs +++ b/crates/schema/src/def.rs @@ -33,8 +33,8 @@ use spacetimedb_data_structures::map::{Equivalent, HashMap}; use spacetimedb_lib::db::raw_def; use spacetimedb_lib::db::raw_def::v10::{ ExplicitNames, RawConstraintDefV10, RawIndexDefV10, RawLifeCycleReducerDefV10, RawModuleDefV10, - RawModuleDefV10Section, RawProcedureDefV10, RawReducerDefV10, RawRowLevelSecurityDefV10, RawScheduleDefV10, - RawScopedTypeNameV10, RawSequenceDefV10, RawTableDefV10, RawTypeDefV10, RawViewDefV10, + RawModuleDefV10Section, RawOutboxDefV10, RawProcedureDefV10, RawReducerDefV10, RawRowLevelSecurityDefV10, + RawScheduleDefV10, RawScopedTypeNameV10, RawSequenceDefV10, RawTableDefV10, RawTypeDefV10, RawViewDefV10, }; use spacetimedb_lib::db::raw_def::v9::{ Lifecycle, RawColumnDefaultValueV9, RawConstraintDataV9, RawConstraintDefV9, RawIndexAlgorithm, RawIndexDefV9, @@ -518,9 +518,10 @@ impl From for RawModuleDefV10 { sections.push(RawModuleDefV10Section::Types(raw_types)); } - // Collect schedules from tables (V10 stores them in a separate section). + // Collect schedules and outboxes from tables (V10 stores them in separate sections). // Also collect ExplicitNames for tables: accessor_name → source_name, name → canonical_name. let mut schedules = Vec::new(); + let mut outboxes: Vec = Vec::new(); let raw_tables: Vec = tables .into_values() .map(|td| { @@ -537,6 +538,13 @@ impl From for RawModuleDefV10 { function_name: sched.function_name.into(), }); } + if let Some(ref outbox) = td.outbox { + outboxes.push(RawOutboxDefV10 { + table_name: td.accessor_name.clone().into(), + remote_reducer: outbox.remote_reducer.clone().into(), + on_result_reducer: outbox.on_result_reducer.clone().map(Into::into), + }); + } td.into() }) .collect(); @@ -593,6 +601,10 @@ impl From for RawModuleDefV10 { sections.push(RawModuleDefV10Section::Schedules(schedules)); } + if !outboxes.is_empty() { + sections.push(RawModuleDefV10Section::Outboxes(outboxes)); + } + if !raw_lifecycle.is_empty() { sections.push(RawModuleDefV10Section::LifeCycleReducers(raw_lifecycle)); } @@ -692,6 +704,27 @@ pub struct TableDef { /// Event tables persist to the commitlog but are not merged into committed state. /// Their rows are only visible to V2 subscribers in the transaction that inserted them. pub is_event: bool, + + /// Whether this is an outbox table. + /// `None` unless this table was declared with `#[spacetimedb::table(outbox(...))]`. + pub outbox: Option, +} + +/// Configuration for an outbox table used in inter-database communication. +/// +/// An outbox table must have: +/// - Col 0: `u64` with `#[primary_key] #[auto_inc]` — row ID tracked in `st_outbound_msg`. +/// - Col 1: `Identity` — target database identity. +/// - Remaining cols: arguments forwarded to `remote_reducer` on the target database. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct OutboxDef { + /// The name of the reducer to call on the target database. + pub remote_reducer: Identifier, + + /// The local reducer called with the delivery result, if any. + /// + /// Signature: `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result)`. + pub on_result_reducer: Option, } impl TableDef { @@ -719,6 +752,7 @@ impl From for RawTableDefV9 { table_type, table_access, is_event: _, // V9 does not support event tables; ignore when converting back + outbox: _, // V9 does not support outbox tables; ignore when converting back .. } = val; @@ -751,6 +785,7 @@ impl From for RawTableDefV10 { table_access, is_event, accessor_name, + outbox: _, // V10 stores outbox definitions in a separate Outboxes section; handled in From. } = val; RawTableDefV10 { @@ -793,6 +828,7 @@ impl From for TableDef { table_access: if is_public { Public } else { Private }, is_event: false, accessor_name, + outbox: None, } } } diff --git a/crates/schema/src/def/validate/v10.rs b/crates/schema/src/def/validate/v10.rs index 7ebbaae06d4..94226ccb636 100644 --- a/crates/schema/src/def/validate/v10.rs +++ b/crates/schema/src/def/validate/v10.rs @@ -188,6 +188,9 @@ pub fn validate(def: RawModuleDefV10) -> Result { }) .unwrap_or_else(|| Ok(Vec::new())); + // Collect raw outbox definitions from the Outboxes section for post-validation attachment. + let raw_outboxes: Vec = def.outboxes().map(|v| v.to_vec()).unwrap_or_default(); + // Validate lifecycle reducers - they reference reducers by name let lifecycle_validations = reducers .as_ref() @@ -242,6 +245,9 @@ pub fn validate(def: RawModuleDefV10) -> Result { // Attach schedules to their respective tables attach_schedules_to_tables(&mut tables, schedules)?; + // Attach outbox definitions to their respective tables + attach_outboxes_to_tables(&mut tables, raw_outboxes)?; + check_scheduled_functions_exist(&mut tables, &reducers, &procedures)?; change_scheduled_functions_and_lifetimes_visibility(&tables, &mut reducers, &mut procedures)?; assign_query_view_primary_keys(&tables, &mut views); @@ -510,6 +516,7 @@ impl<'a> ModuleValidatorV10<'a> { table_access, is_event, accessor_name: identifier(raw_table_name)?, + outbox: None, // V10 attaches outbox defs from the Outboxes section; handled in validate(). }) } @@ -830,6 +837,76 @@ fn attach_schedules_to_tables( Ok(()) } +/// Attach outbox definitions from the `Outboxes` section to their respective tables. +/// +/// Also validates outbox table structure: +/// - Col 0: `u64` with `#[primary_key] #[auto_inc]` +/// - Col 1: `Identity` stored as U256 +/// - At least 2 columns total +fn attach_outboxes_to_tables( + tables: &mut HashMap, + raw_outboxes: Vec, +) -> Result<()> { + use spacetimedb_sats::AlgebraicType; + + for raw in raw_outboxes { + let table_ident = identifier(raw.table_name.clone())?; + + // Find the table by its accessor_name (source_name in the wire format). + let table = tables + .values_mut() + .find(|t| *t.accessor_name == *table_ident) + .ok_or_else(|| ValidationError::OutboxTableNotFound { + table_name: raw.table_name.clone(), + })?; + + if table.outbox.is_some() { + return Err(ValidationError::DuplicateOutbox { + table: table.name.clone(), + } + .into()); + } + + // Validate col count. + if table.columns.len() < 2 { + return Err(ValidationError::OutboxTooFewColumns { + table: table.name.clone(), + } + .into()); + } + + // Validate col 0: must be u64 (auto_inc + PK is enforced by the macro; we just check the type). + let col0_ty = &table.columns[0].ty; + if *col0_ty != AlgebraicType::U64 { + return Err(ValidationError::OutboxInvalidIdColumn { + table: table.name.clone(), + found: col0_ty.clone().into(), + } + .into()); + } + + // Validate col 1: must be Identity (or its raw U256 wire representation). + let col1_ty = &table.columns[1].ty; + if *col1_ty != AlgebraicType::U256 && !col1_ty.is_identity() { + return Err(ValidationError::OutboxInvalidTargetColumn { + table: table.name.clone(), + found: col1_ty.clone().into(), + } + .into()); + } + + let remote_reducer = identifier(raw.remote_reducer)?; + let on_result_reducer = raw.on_result_reducer.map(identifier).transpose()?; + + table.outbox = Some(crate::def::OutboxDef { + remote_reducer, + on_result_reducer, + }); + } + + Ok(()) +} + fn assign_query_view_primary_keys(tables: &IdentifierMap, views: &mut IndexMap) { let primary_key_for_product_type_ref = |product_type_ref: AlgebraicTypeRef| { let mut primary_key = None; @@ -1172,6 +1249,35 @@ mod tests { ); } + #[test] + fn test_outbox_accepts_identity_target_column() { + let mut builder = RawModuleDefV10Builder::new(); + + builder + .build_table_with_new_type( + "outbound_pings", + ProductType::from([ + ("id", AlgebraicType::U64), + ("target", AlgebraicType::identity()), + ("payload", AlgebraicType::String), + ]), + true, + ) + .with_auto_inc_primary_key(0) + .with_index_no_accessor_name(direct(0), "outbound_pings_id") + .finish(); + + builder.add_reducer("send_ping", ProductType::unit()); + builder.add_reducer("receive_ping", ProductType::from([("payload", AlgebraicType::String)])); + builder.add_outbox("outbound_pings", "receive_ping", Option::<&str>::None); + + let validated: Result = builder.finish().try_into(); + assert!( + validated.is_ok(), + "expected outbox validation to accept Identity target column, got {validated:?}", + ); + } + #[test] fn invalid_product_type_ref() { let mut builder = RawModuleDefV10Builder::new(); diff --git a/crates/schema/src/def/validate/v9.rs b/crates/schema/src/def/validate/v9.rs index d040435afe5..87777858579 100644 --- a/crates/schema/src/def/validate/v9.rs +++ b/crates/schema/src/def/validate/v9.rs @@ -324,6 +324,7 @@ impl ModuleValidatorV9<'_> { table_access, is_event: false, // V9 does not support event tables accessor_name: name, + outbox: None, // V9 does not support outbox tables }) } diff --git a/crates/schema/src/error.rs b/crates/schema/src/error.rs index 06f284998b5..2b074ab7a75 100644 --- a/crates/schema/src/error.rs +++ b/crates/schema/src/error.rs @@ -152,6 +152,22 @@ pub enum ValidationError { ok_type: PrettyAlgebraicType, err_type: PrettyAlgebraicType, }, + #[error("outbox table {table_name} not found in module")] + OutboxTableNotFound { table_name: RawIdentifier }, + #[error("table {table} is declared as an outbox table multiple times")] + DuplicateOutbox { table: Identifier }, + #[error("outbox table {table} col 0 must be `u64` with `#[primary_key] #[auto_inc]`, found {found}")] + OutboxInvalidIdColumn { + table: Identifier, + found: PrettyAlgebraicType, + }, + #[error("outbox table {table} col 1 must be `Identity` (U256), found {found}")] + OutboxInvalidTargetColumn { + table: Identifier, + found: PrettyAlgebraicType, + }, + #[error("outbox table {table} must have at least 2 columns (id, target_identity)")] + OutboxTooFewColumns { table: Identifier }, } /// A wrapper around an `AlgebraicType` that implements `fmt::Display`. diff --git a/crates/schema/src/schema.rs b/crates/schema/src/schema.rs index e62b9dc6963..51907a04a3a 100644 --- a/crates/schema/src/schema.rs +++ b/crates/schema/src/schema.rs @@ -201,6 +201,9 @@ pub struct TableSchema { /// Whether this is an event table. pub is_event: bool, + /// Outbox configuration if this is an outbox table; `None` for non-outbox tables. + pub outbox: Option, + /// Cache for `row_type_for_table` in the data store. pub row_type: ProductType, } @@ -227,6 +230,7 @@ impl TableSchema { primary_key: Option, is_event: bool, alias: Option, + outbox: Option, ) -> Self { Self { row_type: columns_to_row_type(&columns), @@ -243,6 +247,7 @@ impl TableSchema { primary_key, is_event, alias, + outbox, } } @@ -281,6 +286,7 @@ impl TableSchema { None, false, None, + None, ) } @@ -803,6 +809,7 @@ impl TableSchema { view_primary_key, false, None, + None, ) } @@ -974,6 +981,7 @@ impl TableSchema { if *is_anonymous { view_primary_key } else { None }, false, Some(accessor_name.clone()), + None, ) } } @@ -1005,6 +1013,7 @@ impl Schema for TableSchema { table_access, is_event, accessor_name, + outbox, .. } = def; @@ -1031,6 +1040,11 @@ impl Schema for TableSchema { .as_ref() .map(|schedule| ScheduleSchema::from_module_def(module_def, schedule, table_id, ScheduleId::SENTINEL)); + let outbox_schema = outbox.as_ref().map(|o| OutboxSchema { + remote_reducer: o.remote_reducer.clone(), + on_result_reducer: o.on_result_reducer.clone(), + }); + TableSchema::new( table_id, TableName::new(name.clone()), @@ -1045,6 +1059,7 @@ impl Schema for TableSchema { *primary_key, *is_event, Some(accessor_name.clone()), + outbox_schema, ) } @@ -1427,6 +1442,15 @@ impl Schema for ScheduleSchema { } } +/// Marks a table as an outbox table. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct OutboxSchema { + /// The name of the reducer to invoke on the target database. + pub remote_reducer: Identifier, + /// The local reducer called with the delivery result, if any. + pub on_result_reducer: Option, +} + /// A struct representing the schema of a database index. #[derive(Debug, Clone, PartialEq, Eq)] pub struct IndexSchema { diff --git a/crates/smoketests/Cargo.toml b/crates/smoketests/Cargo.toml index bfa718e7daa..380d6907c27 100644 --- a/crates/smoketests/Cargo.toml +++ b/crates/smoketests/Cargo.toml @@ -16,6 +16,7 @@ which = "8.0.0" [dev-dependencies] spacetimedb-core.workspace = true +spacetimedb-lib.workspace = true cargo_metadata.workspace = true assert_cmd = "2" predicates = "3" diff --git a/crates/smoketests/src/lib.rs b/crates/smoketests/src/lib.rs index aa54aea04ef..62be4c578e6 100644 --- a/crates/smoketests/src/lib.rs +++ b/crates/smoketests/src/lib.rs @@ -1314,6 +1314,17 @@ log = "0.4" self.api_call_internal(method, path, body, "") } + /// Makes an HTTP API call with an optional body and explicit extra headers. + pub fn api_call_with_body_and_headers( + &self, + method: &str, + path: &str, + body: Option<&[u8]>, + extra_headers: &str, + ) -> Result { + self.api_call_internal(method, path, body, extra_headers) + } + /// Makes an HTTP API call with a JSON body. pub fn api_call_json(&self, method: &str, path: &str, json_body: &str) -> Result { self.api_call_internal( diff --git a/crates/smoketests/tests/smoketests/call_from_database.rs b/crates/smoketests/tests/smoketests/call_from_database.rs new file mode 100644 index 00000000000..c67d9d4871f --- /dev/null +++ b/crates/smoketests/tests/smoketests/call_from_database.rs @@ -0,0 +1,109 @@ +use spacetimedb_lib::bsatn; +use spacetimedb_smoketests::Smoketest; + +fn idc_path(database_identity: &str, reducer: &str, sender_identity: &str, msg_id: u64) -> String { + format!( + "/v1/database/{database_identity}/call-from-database/{reducer}?sender_identity={sender_identity}&msg_id={msg_id}" + ) +} + +fn post_idc(test: &Smoketest, reducer: &str, sender_identity: &str, msg_id: u64, body: &[u8]) -> (u16, String) { + let database_identity = test.database_identity.as_ref().expect("No database published"); + let response = test + .api_call_with_body_and_headers( + "POST", + &idc_path(database_identity, reducer, sender_identity, msg_id), + Some(body), + "Content-Type: application/octet-stream\r\n", + ) + .expect("IDC HTTP call failed"); + let text = response.text().expect("IDC response body should be valid UTF-8"); + (response.status_code, text) +} + +#[test] +fn test_call_from_database_deduplicates_successful_reducer() { + let module_code = r#" +use spacetimedb::{ReducerContext, Table}; + +#[spacetimedb::table(accessor = deliveries, public)] +pub struct Delivery { + #[primary_key] + name: String, + count: u64, +} + +#[spacetimedb::reducer(init)] +pub fn init(ctx: &ReducerContext) { + ctx.db.deliveries().insert(Delivery { + name: "remote".to_string(), + count: 0, + }); +} + +#[spacetimedb::reducer] +pub fn accept(ctx: &ReducerContext) { + let mut row = ctx + .db + .deliveries() + .name() + .find("remote".to_string()) + .expect("delivery row should exist"); + row.count += 1; + ctx.db.deliveries().name().update(row); +} +"#; + + let test = Smoketest::builder().module_code(module_code).build(); + let body = bsatn::to_vec(&()).expect("BSATN encoding should succeed"); + let sender_identity = "00000000000000000000000000000000000000000000000000000000000000aa"; + + let first = post_idc(&test, "accept", sender_identity, 7, &body); + let second = post_idc(&test, "accept", sender_identity, 7, &body); + + assert_eq!(first.0, 200, "first IDC call should succeed: {first:?}"); + assert_eq!(first.1, ""); + assert_eq!(second.0, 200, "duplicate IDC call should still succeed: {second:?}"); + assert_eq!(second.1, ""); + + let output = test.sql("SELECT name, count FROM deliveries").unwrap(); + assert!( + output.contains(r#""remote" | 1"#), + "expected the deduplicated reducer to leave a single delivery row, got:\n{output}", + ); +} + +#[test] +fn test_call_from_database_replays_stored_reducer_error_without_rerunning() { + let module_code = r#" +use spacetimedb::{log, ReducerContext}; + +#[spacetimedb::reducer] +pub fn always_fail(_ctx: &ReducerContext) -> Result<(), String> { + log::info!("IDC failing reducer executed"); + Err("boom".to_string()) +} +"#; + + let test = Smoketest::builder().module_code(module_code).build(); + let body = bsatn::to_vec(&()).expect("BSATN encoding should succeed"); + let sender_identity = "00000000000000000000000000000000000000000000000000000000000000bb"; + + let first = post_idc(&test, "always_fail", sender_identity, 9, &body); + let second = post_idc(&test, "always_fail", sender_identity, 9, &body); + + assert_eq!(first.0, 422, "first IDC call should surface reducer error: {first:?}"); + assert_eq!(first.1.trim(), "boom"); + assert_eq!( + second.0, 422, + "duplicate IDC call should replay reducer error: {second:?}" + ); + assert_eq!(second.1.trim(), "boom"); + + let logs = test.logs(100).unwrap(); + let executions = logs + .iter() + .filter(|line| line.contains("IDC failing reducer executed")) + .count(); + assert_eq!(executions, 1, "duplicate IDC request should not rerun the reducer"); +} diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index f5053652dd3..39142df6199 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -3,6 +3,7 @@ mod add_remove_index; mod auto_inc; mod auto_migration; mod call; +mod call_from_database; mod change_host_type; mod cli; mod client_connection_errors; diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 68450a1b283..4643d8fec6e 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -118,6 +118,12 @@ impl StandaloneEnv { pub fn bsatn_rlb_pool(&self) -> &BsatnRowListBuilderPool { &self.host_controller.bsatn_rlb_pool } + + pub fn set_idc_http_port(&self, port: u16) -> anyhow::Result<()> { + self.host_controller + .set_idc_http_port(port) + .map_err(|_| anyhow::anyhow!("port already set")) + } } #[derive(Debug, thiserror::Error)] diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index a54e3cb39e9..dd32954c5d2 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -253,7 +253,9 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { "failed to bind the SpacetimeDB server to '{listen_addr}', please check that the address is valid and not already in use" ))?; socket2::SockRef::from(&tcp).set_nodelay(true)?; - log::info!("Starting SpacetimeDB listening on {}", tcp.local_addr()?); + let local_addr = tcp.local_addr()?; + ctx.set_idc_http_port(local_addr.port())?; + log::info!("Starting SpacetimeDB listening on {local_addr}"); if let Some(pg_port) = pg_port { let server_addr = listen_addr.split(':').next().unwrap(); diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 3540c5bdff6..4583f419864 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -101,6 +101,11 @@ pub struct Table { /// /// This is an optimization to avoid checking the schema in e.g., `InstanceEnv::{insert, update}`. is_scheduler: bool, + /// Indicates whether this is an outbox table or not. + /// + /// Set from `schema.outbox.is_some()`. + /// This is an optimization to avoid checking the schema in e.g., `InstanceEnv::insert`. + is_outbox: bool, } type StaticLayoutInTable = Option<(StaticLayout, StaticBsatnValidator)>; @@ -213,6 +218,7 @@ impl MemoryUsage for Table { row_count, blob_store_bytes, is_scheduler, + is_outbox, } = self; inner.heap_usage() + pointer_map.heap_usage() @@ -221,6 +227,7 @@ impl MemoryUsage for Table { + row_count.heap_usage() + blob_store_bytes.heap_usage() + is_scheduler.heap_usage() + + is_outbox.heap_usage() } } @@ -537,6 +544,11 @@ impl Table { self.is_scheduler } + /// Returns whether this is an outbox table (i.e., `schema.outbox.is_some()`). + pub fn is_outbox(&self) -> bool { + self.is_outbox + } + /// Check if the `row` conflicts with any unique index on `self`, /// and if there is a conflict, return `Err`. /// @@ -2297,6 +2309,7 @@ impl Table { squashed_offset: SquashedOffset, pointer_map: Option, ) -> Self { + let is_outbox = schema.outbox.is_some(); Self { inner: TableInner { row_layout, @@ -2305,6 +2318,7 @@ impl Table { pages: Pages::default(), }, is_scheduler: schema.schedule.is_some(), + is_outbox, schema, indexes: BTreeMap::new(), pointer_map, diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 318fff8d0cf..a681578c5a1 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -209,6 +209,7 @@ impl CompiledModule { ) .await .unwrap(); + env.set_idc_http_port(80).unwrap(); // TODO: Fix this when we update identity generation. let identity = Identity::ZERO; let db_identity = SpacetimeAuth::alloc(&env).await.unwrap().claims.identity;