diff --git a/crates/bindings-macro/src/lib.rs b/crates/bindings-macro/src/lib.rs index 8fa9705ca0e..fdea5f64b86 100644 --- a/crates/bindings-macro/src/lib.rs +++ b/crates/bindings-macro/src/lib.rs @@ -132,6 +132,8 @@ mod sym { symbol!(update); symbol!(default); symbol!(event); + symbol!(outbox); + symbol!(on_result); symbol!(u8); symbol!(i8); diff --git a/crates/bindings-macro/src/table.rs b/crates/bindings-macro/src/table.rs index 1cbba4c5ca1..fd8ac7196dd 100644 --- a/crates/bindings-macro/src/table.rs +++ b/crates/bindings-macro/src/table.rs @@ -22,6 +22,27 @@ pub(crate) struct TableArgs { accessor: Ident, indices: Vec, event: Option, + outbox: Option, +} + +/// Parsed from `outbox(remote_reducer_fn, on_result = local_reducer_fn)`. +/// +/// For backwards compatibility, `on_result(local_reducer_fn)` is also accepted as a sibling +/// table attribute and attached to the previously-declared outbox. +struct OutboxArg { + span: Span, + /// Path to the remote-side reducer function, used only for its final path segment. + remote_reducer: Path, + /// Path to the local `on_result` reducer, if any. + on_result_reducer: Option, +} + +fn path_tail_name(path: &Path) -> LitStr { + let segment = path + .segments + .last() + .expect("syn::Path should always contain at least one segment"); + LitStr::new(&segment.ident.to_string(), segment.ident.span()) } enum TableAccess { @@ -82,6 +103,7 @@ impl TableArgs { let mut name: Option = None; let mut indices = Vec::new(); let mut event = None; + let mut outbox: Option = None; syn::meta::parser(|meta| { match_meta!(match meta { sym::public => { @@ -149,6 +171,30 @@ If you're migrating from SpacetimeDB 1.*, replace `name = {sym}` with `accessor check_duplicate(&event, &meta)?; event = Some(meta.path.span()); } + sym::outbox => { + check_duplicate_msg(&outbox, &meta, "already specified outbox")?; + outbox = Some(OutboxArg::parse_meta(meta)?); + } + sym::on_result => { + // `on_result` must be specified alongside `outbox`. + // We parse it here and attach it to the outbox arg below. + let span = meta.path.span(); + let on_result_path = OutboxArg::parse_single_path_meta(meta)?; + match &mut outbox { + Some(ob) => { + if ob.on_result_reducer.is_some() { + return Err(syn::Error::new(span, "already specified on_result")); + } + ob.on_result_reducer = Some(on_result_path); + } + None => { + return Err(syn::Error::new( + span, + "on_result requires outbox to be specified first: `outbox(remote_reducer), on_result(local_reducer)`", + )) + } + } + } }); Ok(()) }) @@ -188,6 +234,7 @@ If you're migrating from SpacetimeDB 1.*, replace `name = {name_str_value:?}` wi indices, name, event, + outbox, }) } } @@ -231,6 +278,84 @@ impl ScheduledArg { } } +impl OutboxArg { + /// Parse `outbox(remote_reducer_path, on_result = local_reducer_path)`. + /// + /// For backwards compatibility, `on_result` may also be parsed separately via + /// `parse_single_path_meta` and attached afterwards. + fn parse_meta(meta: ParseNestedMeta) -> syn::Result { + let span = meta.path.span(); + let mut remote_reducer: Option = None; + let mut on_result_reducer: Option = None; + + meta.parse_nested_meta(|meta| { + if meta.input.peek(Token![=]) { + if meta.path.is_ident("on_result") { + check_duplicate_msg( + &on_result_reducer, + &meta, + "can only specify one on_result reducer", + )?; + on_result_reducer = Some(meta.value()?.parse()?); + Ok(()) + } else { + Err(meta.error( + "outbox only supports `on_result = my_local_reducer` as a named argument", + )) + } + } else if meta.input.peek(syn::token::Paren) { + Err(meta.error( + "outbox expects a remote reducer path and optional `on_result = my_local_reducer`, e.g. `outbox(my_remote_reducer, on_result = my_local_reducer)`", + )) + } else if meta.path.is_ident("on_result") { + Err(meta.error( + "outbox `on_result` must use `=`, e.g. `outbox(my_remote_reducer, on_result = my_local_reducer)`", + )) + } else { + check_duplicate_msg( + &remote_reducer, + &meta, + "can only specify one remote reducer for outbox", + )?; + remote_reducer = Some(meta.path); + Ok(()) + } + })?; + + let remote_reducer = remote_reducer + .ok_or_else(|| syn::Error::new(span, "outbox requires a remote reducer: `outbox(my_remote_reducer)`"))?; + + Ok(Self { + span, + remote_reducer, + on_result_reducer, + }) + } + + /// Parse `on_result(local_reducer_path)` and return the path. + fn parse_single_path_meta(meta: ParseNestedMeta) -> syn::Result { + let span = meta.path.span(); + let mut result: Option = None; + + meta.parse_nested_meta(|meta| { + if meta.input.peek(syn::Token![=]) || meta.input.peek(syn::token::Paren) { + Err(meta.error("on_result takes a single function path, e.g. `on_result(my_local_reducer)`")) + } else { + check_duplicate_msg(&result, &meta, "can only specify one on_result reducer")?; + result = Some(meta.path); + Ok(()) + } + })?; + + result.ok_or_else(|| { + syn::Error::new( + span, + "on_result requires a local reducer: `on_result(my_local_reducer)`", + ) + }) + } +} + impl IndexArg { fn parse_meta(meta: ParseNestedMeta) -> syn::Result { let mut accessor = None; @@ -1043,6 +1168,9 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R } }; + // Save a clone before the schedule closure captures `primary_key_column` by move. + let primary_key_column_for_outbox = primary_key_column.clone(); + let (schedule, schedule_typecheck) = args .scheduled .as_ref() @@ -1101,6 +1229,49 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R .unzip(); let schedule = schedule.into_iter(); + let (outbox, outbox_typecheck) = args + .outbox + .as_ref() + .map(|ob| { + let primary_key_column = primary_key_column_for_outbox.ok_or_else(|| { + syn::Error::new( + ob.span, + "outbox tables must have a `#[primary_key] #[auto_inc]` u64 column at position 0 (the row_id)", + ) + })?; + if primary_key_column.index != 0 { + return Err(syn::Error::new( + ob.span, + "outbox tables must have the `#[primary_key] #[auto_inc]` column as the first column (col 0)", + )); + } + + let remote_reducer_name = path_tail_name(&ob.remote_reducer); + let on_result_reducer_name = match ob.on_result_reducer.as_ref().map(path_tail_name) { + Some(r) => quote!(Some(#r)), + None => quote!(None), + }; + let desc = quote!(spacetimedb::table::OutboxDesc { + remote_reducer_name: #remote_reducer_name, + on_result_reducer_name: #on_result_reducer_name, + }); + + let primary_key_ty = primary_key_column.ty; + let callback_typecheck = ob + .on_result_reducer + .as_ref() + .map(|r| quote!(spacetimedb::rt::outbox_typecheck::<#original_struct_ident>(#r);)); + let typecheck = quote! { + spacetimedb::rt::assert_outbox_table_primary_key::<#primary_key_ty>(); + #callback_typecheck + }; + + Ok((desc, typecheck)) + }) + .transpose()? + .unzip(); + let outbox = outbox.into_iter(); + let unique_err = if !unique_columns.is_empty() { quote!(spacetimedb::UniqueConstraintViolation) } else { @@ -1134,6 +1305,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R #(const PRIMARY_KEY: Option = Some(#primary_col_id);)* const SEQUENCES: &'static [u16] = &[#(#sequence_col_ids),*]; #(const SCHEDULE: Option> = Some(#schedule);)* + #(const OUTBOX: Option> = Some(#outbox);)* #table_id_from_name_func #default_fn @@ -1285,6 +1457,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R const _: () = { #(let _ = <#field_types as spacetimedb::rt::TableColumn>::_ITEM;)* #schedule_typecheck + #outbox_typecheck #default_type_check }; diff --git a/crates/bindings/src/rt.rs b/crates/bindings/src/rt.rs index d6d55eba5f4..3214d83097c 100644 --- a/crates/bindings/src/rt.rs +++ b/crates/bindings/src/rt.rs @@ -533,6 +533,21 @@ impl TableColumn for T {} /// Assert that the primary_key column of a scheduled table is a u64. pub const fn assert_scheduled_table_primary_key() {} +/// Assert that the primary_key column of an outbox table is a u64. +pub const fn assert_outbox_table_primary_key() {} + +/// Verify at compile time that a function has the correct signature for an outbox `on_result` reducer. +/// +/// The reducer must accept `(OutboxRow, Result)` as its user-supplied arguments: +/// `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result)` +pub const fn outbox_typecheck<'de, OutboxRow, T>(_x: impl Reducer<'de, (OutboxRow, Result)>) +where + OutboxRow: spacetimedb_lib::SpacetimeType + Serialize + Deserialize<'de>, + T: spacetimedb_lib::SpacetimeType + Serialize + Deserialize<'de>, +{ + core::mem::forget(_x); +} + mod sealed { pub trait Sealed {} } @@ -544,6 +559,13 @@ pub trait ScheduledTablePrimaryKey: sealed::Sealed {} impl sealed::Sealed for u64 {} impl ScheduledTablePrimaryKey for u64 {} +#[diagnostic::on_unimplemented( + message = "outbox table primary key must be a `u64`", + label = "should be `u64`, not `{Self}`" +)] +pub trait OutboxTablePrimaryKey: sealed::Sealed {} +impl OutboxTablePrimaryKey for u64 {} + /// Used in the last type parameter of `Reducer` to indicate that the /// context argument *should* be passed to the reducer logic. pub struct ContextArg; @@ -763,6 +785,12 @@ pub fn register_table() { table.finish(); + if let Some(outbox) = T::OUTBOX { + module + .inner + .add_outbox(T::TABLE_NAME, outbox.remote_reducer_name, outbox.on_result_reducer_name); + } + module.inner.add_explicit_names(T::explicit_names()); }) } diff --git a/crates/bindings/src/table.rs b/crates/bindings/src/table.rs index 6865aa48e98..d2df94fad1d 100644 --- a/crates/bindings/src/table.rs +++ b/crates/bindings/src/table.rs @@ -142,6 +142,7 @@ pub trait TableInternal: Sized { const SEQUENCES: &'static [u16]; const SCHEDULE: Option> = None; const IS_EVENT: bool = false; + const OUTBOX: Option> = None; /// Returns the ID of this table. fn table_id() -> TableId; @@ -169,6 +170,14 @@ pub struct ScheduleDesc<'a> { pub scheduled_at_column: u16, } +/// Describes the outbox configuration of a table, for inter-database communication. +pub struct OutboxDesc<'a> { + /// The name of the remote reducer to invoke on the target database. + pub remote_reducer_name: &'a str, + /// The local reducer to call with the delivery result, if any. + pub on_result_reducer_name: Option<&'a str>, +} + #[derive(Debug, Clone)] pub struct ColumnDefault { pub col_id: u16, diff --git a/smoketests/tests/outbox.py b/smoketests/tests/outbox.py new file mode 100644 index 00000000000..672dd06a0ce --- /dev/null +++ b/smoketests/tests/outbox.py @@ -0,0 +1,248 @@ +import time + +from .. import Smoketest, random_string + + +class OutboxPingPong(Smoketest): + AUTOPUBLISH = False + + RECEIVER_MODULE = """ +use spacetimedb::{Identity, ReducerContext, SpacetimeType, Table}; + +#[derive(SpacetimeType)] +pub struct Ping { + payload: String, +} + +#[spacetimedb::table(accessor = received_pings, public)] +pub struct ReceivedPing { + #[primary_key] + #[auto_inc] + id: u64, + sender: Identity, + payload: String, +} + +#[spacetimedb::reducer] +pub fn receive_ping(ctx: &ReducerContext, ping: Ping) { + ctx.db.received_pings().insert(ReceivedPing { + id: 0, + sender: ctx.sender(), + payload: ping.payload, + }); +} +""" + + # Receiver that always returns an error from the reducer. + FAILING_RECEIVER_MODULE = """ +use spacetimedb::{ReducerContext, SpacetimeType}; + +#[derive(SpacetimeType)] +pub struct Ping { + payload: String, +} + +#[spacetimedb::reducer] +pub fn receive_ping(ctx: &ReducerContext, ping: Ping) -> Result<(), String> { + Err(format!("deliberate failure for payload: {}", ping.payload)) +} +""" + + SENDER_MODULE = """ +use spacetimedb::{Identity, ReducerContext, SpacetimeType, Table}; + +#[derive(SpacetimeType)] +pub struct Ping { + payload: String, +} + +#[spacetimedb::table(accessor = outbound_pings, public, outbox(receive_ping, on_result = local_callback))] +pub struct OutboundPing { + #[primary_key] + #[auto_inc] + id: u64, + target: Identity, + ping: Ping, +} + +#[spacetimedb::table(accessor = callback_results, public)] +pub struct CallbackResult { + #[primary_key] + #[auto_inc] + id: u64, + payload: String, + status: String, +} + +#[spacetimedb::reducer] +pub fn send_ping(ctx: &ReducerContext, target_hex: String, payload: String) { + let target = Identity::from_hex(&target_hex).expect("target identity should be valid hex"); + ctx.db.outbound_pings().insert(OutboundPing { + id: 0, + target, + ping: Ping { payload }, + }); +} + +#[spacetimedb::reducer] +pub fn local_callback(ctx: &ReducerContext, request: OutboundPing, result: Result<(), String>) { + let status = match result { + Ok(()) => "ok".to_string(), + Err(err) => format!("err:{err}"), + }; + ctx.db.callback_results().insert(CallbackResult { + id: 0, + payload: request.ping.payload, + status, + }); +} +""" + + # --- helpers --- + + def sql_on(self, database_identity, query): + return self.spacetime("sql", "--anonymous", "--", database_identity, query) + + def publish_module_with_cleanup(self, module_code, name_prefix): + """Publish a module and register a cleanup that deletes it. Returns its identity.""" + self.write_module_code(module_code) + self.publish_module(f"{name_prefix}-{random_string()}", clear=False) + identity = self.database_identity + self.addCleanup(lambda db=identity: self.spacetime("delete", "--yes", db)) + return identity + + def poll_until(self, condition, timeout, poll_interval=0.1, timeout_msg="timed out"): + """ + Poll `condition()` every `poll_interval` seconds until it returns a truthy + value or `timeout` seconds elapse. Returns the truthy value on success, + or calls self.fail() on timeout. + """ + deadline = time.time() + timeout + while True: + result = condition() + if result: + return result + if time.time() >= deadline: + msg = timeout_msg() if callable(timeout_msg) else timeout_msg + self.fail(msg) + time.sleep(poll_interval) + + def assert_callback_result(self, sender_identity, payload, expected_status_fragment): + """Assert that exactly one callback row exists for `payload` with the expected status.""" + results = self.sql_on(sender_identity, "SELECT payload, status FROM callback_results") + self.assertEqual(results.count(f'"{payload}"'), 1, results) + self.assertIn(expected_status_fragment, results, results) + + def assert_no_callback_yet(self, sender_identity, payload): + """Assert that no callback row has been recorded for `payload` yet.""" + results = self.sql_on(sender_identity, "SELECT payload, status FROM callback_results") + self.assertNotIn( + f'"{payload}"', + results, + f"callback fired unexpectedly for payload '{payload}':\n{results}", + ) + + def assert_not_redelivered(self, sender_identity, payload, expected_status_fragment, wait=1.0): + """After a short wait, confirm the callback row count has not grown.""" + time.sleep(wait) + self.assert_callback_result(sender_identity, payload, expected_status_fragment) + + def poll_for_callback(self, sender_identity, payload, status_fragment, timeout): + """Poll until a callback row with the given payload and status fragment appears.""" + self.poll_until( + lambda: ( + f'"{payload}"' in (r := self.sql_on(sender_identity, "SELECT payload, status FROM callback_results")) + and status_fragment in r + ), + timeout=timeout, + timeout_msg=lambda: ( + f"timed out waiting for callback with status '{status_fragment}', last query output:\n" + + self.sql_on(sender_identity, "SELECT payload, status FROM callback_results") + ), + ) + + # --- tests --- + + def test_outbox_ping_from_sender_module_reaches_receiver_module(self): + receiver_identity = self.publish_module_with_cleanup(self.RECEIVER_MODULE, "outbox-receiver") + sender_identity = self.publish_module_with_cleanup(self.SENDER_MODULE, "outbox-sender") + + payload = "ping" + self.call("send_ping", receiver_identity, payload) + + self.poll_until( + lambda: f'"{payload}"' in self.sql_on(receiver_identity, "SELECT * FROM received_pings"), + timeout=8, + timeout_msg=lambda: ( + "timed out waiting for ping delivery, last query output:\n" + + self.sql_on(receiver_identity, "SELECT * FROM received_pings") + ), + ) + + self.poll_for_callback(sender_identity, payload, '"ok"', timeout=8) + self.assert_callback_result(sender_identity, payload, '"ok"') + self.assert_not_redelivered(sender_identity, payload, '"ok"') + + def test_outbox_callback_receives_error_when_remote_reducer_fails(self): + """ + When the remote reducer returns Err(...), the sender's on_result callback + should be invoked with the error string, not "ok". + """ + receiver_identity = self.publish_module_with_cleanup(self.FAILING_RECEIVER_MODULE, "outbox-failing-receiver") + sender_identity = self.publish_module_with_cleanup(self.SENDER_MODULE, "outbox-sender") + + payload = "ping-that-will-fail" + self.call("send_ping", receiver_identity, payload) + + self.poll_for_callback(sender_identity, payload, '"err:', timeout=8) + + results = self.sql_on(sender_identity, "SELECT payload, status FROM callback_results") + self.assertEqual(results.count(f'"{payload}"'), 1, results) + self.assertIn('"err:', results) + self.assertIn("deliberate failure for payload", results) + self.assertNotIn('"ok"', results) + + # Reducer errors are terminal — must not be retried. + self.assert_not_redelivered(sender_identity, payload, '"err:') + + def test_outbox_retries_delivery_until_remote_module_is_available(self): + """ + When a message is sent to a non-existent target database, the IDC actor + should receive a transport error (HTTP 404/503) and retry with backoff. + Once the target database is published, the message should eventually be + delivered and the callback fired with "ok". + """ + sender_identity = self.publish_module_with_cleanup(self.SENDER_MODULE, "outbox-sender") + + # Publish then immediately delete a receiver to obtain a real identity + # that is currently absent from the system. + self.write_module_code(self.RECEIVER_MODULE) + receiver_name = f"outbox-receiver-{random_string()}" + self.publish_module(receiver_name, clear=False) + receiver_identity = self.database_identity + self.spacetime("delete", "--yes", receiver_identity) + + payload = "retry-ping" + self.database_identity = sender_identity + self.call("send_ping", receiver_identity, payload) + + # Give the IDC actor a moment to attempt delivery and hit a transport error. + time.sleep(1.0) + self.assert_no_callback_yet(sender_identity, payload) + + # Bring the receiver back online under the same name (same identity). + self.write_module_code(self.RECEIVER_MODULE) + self.publish_module(receiver_name, clear=False) + receiver_identity_new = self.database_identity + self.addCleanup(lambda db=receiver_identity_new: self.spacetime("delete", "--yes", db)) + + self.assertEqual( + receiver_identity, + receiver_identity_new, + "re-published receiver identity differs; retry test requires a stable identity", + ) + + # Wait for the IDC actor to retry and succeed (allow for exponential backoff). + self.poll_for_callback(sender_identity, payload, '"ok"', timeout=30) + self.assert_callback_result(sender_identity, payload, '"ok"') + self.assert_not_redelivered(sender_identity, payload, '"ok"')