Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/bindings-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ mod sym {
symbol!(update);
symbol!(default);
symbol!(event);
symbol!(outbox);
symbol!(on_result);

symbol!(u8);
symbol!(i8);
Expand Down
173 changes: 173 additions & 0 deletions crates/bindings-macro/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ pub(crate) struct TableArgs {
accessor: Ident,
indices: Vec<IndexArg>,
event: Option<Span>,
outbox: Option<OutboxArg>,
}

/// Parsed from `outbox(remote_reducer_fn, on_result = local_reducer_fn)`.
///
/// For backwards compatibility, `on_result(local_reducer_fn)` is also accepted as a sibling
/// table attribute and attached to the previously-declared outbox.
struct OutboxArg {
span: Span,
/// Path to the remote-side reducer function, used only for its final path segment.
remote_reducer: Path,
/// Path to the local `on_result` reducer, if any.
on_result_reducer: Option<Path>,
}

fn path_tail_name(path: &Path) -> LitStr {
let segment = path
.segments
.last()
.expect("syn::Path should always contain at least one segment");
LitStr::new(&segment.ident.to_string(), segment.ident.span())
}

enum TableAccess {
Expand Down Expand Up @@ -82,6 +103,7 @@ impl TableArgs {
let mut name: Option<LitStr> = None;
let mut indices = Vec::new();
let mut event = None;
let mut outbox: Option<OutboxArg> = None;
syn::meta::parser(|meta| {
match_meta!(match meta {
sym::public => {
Expand Down Expand Up @@ -149,6 +171,30 @@ If you're migrating from SpacetimeDB 1.*, replace `name = {sym}` with `accessor
check_duplicate(&event, &meta)?;
event = Some(meta.path.span());
}
sym::outbox => {
check_duplicate_msg(&outbox, &meta, "already specified outbox")?;
outbox = Some(OutboxArg::parse_meta(meta)?);
}
sym::on_result => {
// `on_result` must be specified alongside `outbox`.
// We parse it here and attach it to the outbox arg below.
let span = meta.path.span();
let on_result_path = OutboxArg::parse_single_path_meta(meta)?;
match &mut outbox {
Some(ob) => {
if ob.on_result_reducer.is_some() {
return Err(syn::Error::new(span, "already specified on_result"));
}
ob.on_result_reducer = Some(on_result_path);
}
None => {
return Err(syn::Error::new(
span,
"on_result requires outbox to be specified first: `outbox(remote_reducer), on_result(local_reducer)`",
))
}
}
}
});
Ok(())
})
Expand Down Expand Up @@ -188,6 +234,7 @@ If you're migrating from SpacetimeDB 1.*, replace `name = {name_str_value:?}` wi
indices,
name,
event,
outbox,
})
}
}
Expand Down Expand Up @@ -231,6 +278,84 @@ impl ScheduledArg {
}
}

impl OutboxArg {
/// Parse `outbox(remote_reducer_path, on_result = local_reducer_path)`.
///
/// For backwards compatibility, `on_result` may also be parsed separately via
/// `parse_single_path_meta` and attached afterwards.
fn parse_meta(meta: ParseNestedMeta) -> syn::Result<Self> {
let span = meta.path.span();
let mut remote_reducer: Option<Path> = None;
let mut on_result_reducer: Option<Path> = None;

meta.parse_nested_meta(|meta| {
if meta.input.peek(Token![=]) {
if meta.path.is_ident("on_result") {
check_duplicate_msg(
&on_result_reducer,
&meta,
"can only specify one on_result reducer",
)?;
on_result_reducer = Some(meta.value()?.parse()?);
Ok(())
} else {
Err(meta.error(
"outbox only supports `on_result = my_local_reducer` as a named argument",
))
}
} else if meta.input.peek(syn::token::Paren) {
Err(meta.error(
"outbox expects a remote reducer path and optional `on_result = my_local_reducer`, e.g. `outbox(my_remote_reducer, on_result = my_local_reducer)`",
))
} else if meta.path.is_ident("on_result") {
Err(meta.error(
"outbox `on_result` must use `=`, e.g. `outbox(my_remote_reducer, on_result = my_local_reducer)`",
))
} else {
check_duplicate_msg(
&remote_reducer,
&meta,
"can only specify one remote reducer for outbox",
)?;
remote_reducer = Some(meta.path);
Ok(())
}
})?;

let remote_reducer = remote_reducer
.ok_or_else(|| syn::Error::new(span, "outbox requires a remote reducer: `outbox(my_remote_reducer)`"))?;

Ok(Self {
span,
remote_reducer,
on_result_reducer,
})
}

/// Parse `on_result(local_reducer_path)` and return the path.
fn parse_single_path_meta(meta: ParseNestedMeta) -> syn::Result<Path> {
let span = meta.path.span();
let mut result: Option<Path> = None;

meta.parse_nested_meta(|meta| {
if meta.input.peek(syn::Token![=]) || meta.input.peek(syn::token::Paren) {
Err(meta.error("on_result takes a single function path, e.g. `on_result(my_local_reducer)`"))
} else {
check_duplicate_msg(&result, &meta, "can only specify one on_result reducer")?;
result = Some(meta.path);
Ok(())
}
})?;

result.ok_or_else(|| {
syn::Error::new(
span,
"on_result requires a local reducer: `on_result(my_local_reducer)`",
)
})
}
}

impl IndexArg {
fn parse_meta(meta: ParseNestedMeta) -> syn::Result<Self> {
let mut accessor = None;
Expand Down Expand Up @@ -1043,6 +1168,9 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
}
};

// Save a clone before the schedule closure captures `primary_key_column` by move.
let primary_key_column_for_outbox = primary_key_column.clone();

let (schedule, schedule_typecheck) = args
.scheduled
.as_ref()
Expand Down Expand Up @@ -1101,6 +1229,49 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
.unzip();
let schedule = schedule.into_iter();

let (outbox, outbox_typecheck) = args
.outbox
.as_ref()
.map(|ob| {
let primary_key_column = primary_key_column_for_outbox.ok_or_else(|| {
syn::Error::new(
ob.span,
"outbox tables must have a `#[primary_key] #[auto_inc]` u64 column at position 0 (the row_id)",
)
})?;
if primary_key_column.index != 0 {
return Err(syn::Error::new(
ob.span,
"outbox tables must have the `#[primary_key] #[auto_inc]` column as the first column (col 0)",
));
}

let remote_reducer_name = path_tail_name(&ob.remote_reducer);
let on_result_reducer_name = match ob.on_result_reducer.as_ref().map(path_tail_name) {
Some(r) => quote!(Some(#r)),
None => quote!(None),
};
let desc = quote!(spacetimedb::table::OutboxDesc {
remote_reducer_name: #remote_reducer_name,
on_result_reducer_name: #on_result_reducer_name,
});

let primary_key_ty = primary_key_column.ty;
let callback_typecheck = ob
.on_result_reducer
.as_ref()
.map(|r| quote!(spacetimedb::rt::outbox_typecheck::<#original_struct_ident>(#r);));
let typecheck = quote! {
spacetimedb::rt::assert_outbox_table_primary_key::<#primary_key_ty>();
#callback_typecheck
};

Ok((desc, typecheck))
})
.transpose()?
.unzip();
let outbox = outbox.into_iter();

let unique_err = if !unique_columns.is_empty() {
quote!(spacetimedb::UniqueConstraintViolation)
} else {
Expand Down Expand Up @@ -1134,6 +1305,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
#(const PRIMARY_KEY: Option<u16> = Some(#primary_col_id);)*
const SEQUENCES: &'static [u16] = &[#(#sequence_col_ids),*];
#(const SCHEDULE: Option<spacetimedb::table::ScheduleDesc<'static>> = Some(#schedule);)*
#(const OUTBOX: Option<spacetimedb::table::OutboxDesc<'static>> = Some(#outbox);)*

#table_id_from_name_func
#default_fn
Expand Down Expand Up @@ -1285,6 +1457,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
const _: () = {
#(let _ = <#field_types as spacetimedb::rt::TableColumn>::_ITEM;)*
#schedule_typecheck
#outbox_typecheck
#default_type_check
};

Expand Down
28 changes: 28 additions & 0 deletions crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,21 @@ impl<T: SpacetimeType> TableColumn for T {}
/// Assert that the primary_key column of a scheduled table is a u64.
pub const fn assert_scheduled_table_primary_key<T: ScheduledTablePrimaryKey>() {}

/// Assert that the primary_key column of an outbox table is a u64.
pub const fn assert_outbox_table_primary_key<T: OutboxTablePrimaryKey>() {}

/// Verify at compile time that a function has the correct signature for an outbox `on_result` reducer.
///
/// The reducer must accept `(OutboxRow, Result<T, String>)` as its user-supplied arguments:
/// `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result<T, String>)`
pub const fn outbox_typecheck<'de, OutboxRow, T>(_x: impl Reducer<'de, (OutboxRow, Result<T, String>)>)
where
OutboxRow: spacetimedb_lib::SpacetimeType + Serialize + Deserialize<'de>,
T: spacetimedb_lib::SpacetimeType + Serialize + Deserialize<'de>,
{
core::mem::forget(_x);
}

mod sealed {
pub trait Sealed {}
}
Expand All @@ -544,6 +559,13 @@ pub trait ScheduledTablePrimaryKey: sealed::Sealed {}
impl sealed::Sealed for u64 {}
impl ScheduledTablePrimaryKey for u64 {}

#[diagnostic::on_unimplemented(
message = "outbox table primary key must be a `u64`",
label = "should be `u64`, not `{Self}`"
)]
pub trait OutboxTablePrimaryKey: sealed::Sealed {}
impl OutboxTablePrimaryKey for u64 {}

/// Used in the last type parameter of `Reducer` to indicate that the
/// context argument *should* be passed to the reducer logic.
pub struct ContextArg;
Expand Down Expand Up @@ -763,6 +785,12 @@ pub fn register_table<T: Table>() {

table.finish();

if let Some(outbox) = T::OUTBOX {
module
.inner
.add_outbox(T::TABLE_NAME, outbox.remote_reducer_name, outbox.on_result_reducer_name);
}

module.inner.add_explicit_names(T::explicit_names());
})
}
Expand Down
9 changes: 9 additions & 0 deletions crates/bindings/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub trait TableInternal: Sized {
const SEQUENCES: &'static [u16];
const SCHEDULE: Option<ScheduleDesc<'static>> = None;
const IS_EVENT: bool = false;
const OUTBOX: Option<OutboxDesc<'static>> = None;

/// Returns the ID of this table.
fn table_id() -> TableId;
Expand Down Expand Up @@ -169,6 +170,14 @@ pub struct ScheduleDesc<'a> {
pub scheduled_at_column: u16,
}

/// Describes the outbox configuration of a table, for inter-database communication.
pub struct OutboxDesc<'a> {
/// The name of the remote reducer to invoke on the target database.
pub remote_reducer_name: &'a str,
/// The local reducer to call with the delivery result, if any.
pub on_result_reducer_name: Option<&'a str>,
}

#[derive(Debug, Clone)]
pub struct ColumnDefault {
pub col_id: u16,
Expand Down
Loading
Loading