diff --git a/crates/client-api-messages/DEVELOP.md b/crates/client-api-messages/DEVELOP.md index 47868d4d3ce..605318f7076 100644 --- a/crates/client-api-messages/DEVELOP.md +++ b/crates/client-api-messages/DEVELOP.md @@ -19,3 +19,8 @@ spacetime generate -p spacetimedb-cli --lang \ --out-dir \ --module-def ws_schema_v2.json ``` + +Note, the v3 WebSocket protocol does not have a separate schema. +It reuses the v2 message schema and only changes the websocket binary framing. +In v2 for example, a WebSocket frame contained a single BSATN-encoded v2 message, +but in v3, a single WebSocket frame may contain a batch of one or more v2 messages. diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index 0935d2e3c55..14ec394670f 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -17,3 +17,4 @@ pub mod common; pub mod v1; pub mod v2; +pub mod v3; diff --git a/crates/client-api-messages/src/websocket/v3.rs b/crates/client-api-messages/src/websocket/v3.rs new file mode 100644 index 00000000000..351546654dd --- /dev/null +++ b/crates/client-api-messages/src/websocket/v3.rs @@ -0,0 +1,13 @@ +//! Binary framing for websocket protocol v3. +//! +//! Unlike v2, v3 does not define a new outer message schema. +//! A single binary websocket payload contains one or more BSATN-encoded +//! [`crate::websocket::v2::ClientMessage`] values from client to server, +//! or one or more consecutive BSATN-encoded [`crate::websocket::v2::ServerMessage`] +//! values from server to client. +//! +//! Client and server may coalesce multiple messages into one websocket payload, +//! or send them separately, regardless of what the other one does, +//! so long as logical order is preserved. + +pub const BIN_PROTOCOL: &str = "v3.bsatn.spacetimedb"; diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index cdbe50ac9f9..6602d34d9a5 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -38,6 +38,7 @@ use spacetimedb::worker_metrics::WORKER_METRICS; use spacetimedb::Identity; use spacetimedb_client_api_messages::websocket::v1 as ws_v1; use spacetimedb_client_api_messages::websocket::v2 as ws_v2; +use spacetimedb_client_api_messages::websocket::v3 as ws_v3; use spacetimedb_datastore::execution_context::WorkloadType; use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl}; use tokio::sync::{mpsc, watch}; @@ -62,6 +63,8 @@ pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::TEXT_PROT pub const BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::BIN_PROTOCOL); #[allow(clippy::declare_interior_mutable_const)] pub const V2_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v2::BIN_PROTOCOL); +#[allow(clippy::declare_interior_mutable_const)] +pub const V3_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v3::BIN_PROTOCOL); pub trait HasWebSocketOptions { fn websocket_options(&self) -> WebSocketOptions; @@ -101,7 +104,7 @@ fn resolve_confirmed_reads_default(version: WsVersion, confirmed: Option) } match version { WsVersion::V1 => false, - WsVersion::V2 => crate::DEFAULT_CONFIRMED_READS, + WsVersion::V2 | WsVersion::V3 => crate::DEFAULT_CONFIRMED_READS, } } @@ -151,6 +154,13 @@ where } let (res, ws_upgrade, protocol) = ws.select_protocol([ + ( + V3_BIN_PROTOCOL, + NegotiatedProtocol { + protocol: Protocol::Binary, + version: WsVersion::V3, + }, + ), ( V2_BIN_PROTOCOL, NegotiatedProtocol { @@ -284,7 +294,7 @@ where }; client.send_message(None, OutboundMessage::V1(message.into())) } - WsVersion::V2 => { + WsVersion::V2 | WsVersion::V3 => { let message = ws_v2::ServerMessage::InitialConnection(ws_v2::InitialConnection { identity: client_identity, connection_id, @@ -1296,7 +1306,7 @@ async fn ws_encode_task( let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY); let mut in_use_bufs: Vec> = Vec::with_capacity(BUF_POOL_CAPACITY); - while let Some(message) = messages.recv().await { + 'send: while let Some(message) = messages.recv().await { // Drop serialize buffers with no external referent, // returning them to the pool. in_use_bufs.retain(|in_use| !in_use.is_unique()); @@ -1306,16 +1316,22 @@ async fn ws_encode_task( let in_use_buf = match message { OutboundWsMessage::Error(message) => { - if config.version == WsVersion::V2 { - log::error!("dropping v1 error message sent to a v2 client: {:?}", message); + if config.version != WsVersion::V1 { + log::error!( + "dropping v1 error message sent to a binary websocket client: {:?}", + message + ); continue; } - let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await; - metrics.report(None, None, stats); - if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { - break; - } - + let Ok(in_use) = ws_forward_frames( + &metrics, + &outgoing_frames, + None, + None, + ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await, + ) else { + break 'send; + }; in_use } OutboundWsMessage::Message(message) => { @@ -1323,38 +1339,39 @@ async fn ws_encode_task( let num_rows = message.num_rows(); match message { OutboundMessage::V2(server_message) => { - if config.version != WsVersion::V2 { + if config.version == WsVersion::V1 { log::error!("dropping v2 message on v1 connection"); continue; } - let (stats, in_use, mut frames) = - ws_encode_message_v2(config, buf, server_message, false, &bsatn_rlb_pool).await; - metrics.report(workload, num_rows, stats); - if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { - break; - } - + let Ok(in_use) = ws_forward_frames( + &metrics, + &outgoing_frames, + workload, + num_rows, + ws_encode_binary_message(config, buf, server_message, false, &bsatn_rlb_pool).await, + ) else { + break 'send; + }; in_use } OutboundMessage::V1(message) => { - if config.version == WsVersion::V2 { - log::error!( - "dropping v1 message for v2 connection until v2 serialization is implemented: {:?}", - message - ); + if config.version != WsVersion::V1 { + log::error!("dropping v1 message for a binary websocket connection: {:?}", message); continue; } let is_large = num_rows.is_some_and(|n| n > 1024); - let (stats, in_use, mut frames) = - ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await; - metrics.report(workload, num_rows, stats); - if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { - break; - } - + let Ok(in_use) = ws_forward_frames( + &metrics, + &outgoing_frames, + workload, + num_rows, + ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await, + ) else { + break 'send; + }; in_use } } @@ -1370,6 +1387,21 @@ async fn ws_encode_task( } } +/// Reports encode metrics for an already-encoded message and forwards its +/// frames to the websocket send task. +fn ws_forward_frames( + metrics: &SendMetrics, + outgoing_frames: &mpsc::UnboundedSender, + workload: Option, + num_rows: Option, + encoded: (EncodeMetrics, InUseSerializeBuffer, impl IntoIterator), +) -> Result> { + let (stats, in_use, frames) = encoded; + metrics.report(workload, num_rows, stats); + frames.into_iter().try_for_each(|frame| outgoing_frames.send(frame))?; + Ok(in_use) +} + /// Some stats about serialization and compression. /// /// Returned by [`ws_encode_message`]. @@ -1443,8 +1475,7 @@ async fn ws_encode_message( (metrics, msg_alloc, frames) } -#[allow(dead_code, unused_variables)] -async fn ws_encode_message_v2( +async fn ws_encode_binary_message( config: ClientConfig, buf: SerializeBuffer, message: ws_v2::ServerMessage, @@ -1452,12 +1483,13 @@ async fn ws_encode_message_v2( bsatn_rlb_pool: &BsatnRowListBuilderPool, ) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator + use<>) { let start = Instant::now(); + let compression = config.compression; let (in_use, data) = if is_large_message { let bsatn_rlb_pool = bsatn_rlb_pool.clone(); - spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, config.compression)).await + spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, compression)).await } else { - serialize_v2(bsatn_rlb_pool, buf, message, config.compression) + serialize_v2(bsatn_rlb_pool, buf, message, compression) }; let metrics = EncodeMetrics { @@ -2298,9 +2330,11 @@ mod tests { #[test] fn confirmed_reads_default_depends_on_ws_version() { + assert!(resolve_confirmed_reads_default(WsVersion::V3, None)); assert!(resolve_confirmed_reads_default(WsVersion::V2, None)); assert!(!resolve_confirmed_reads_default(WsVersion::V1, None)); assert!(resolve_confirmed_reads_default(WsVersion::V1, Some(true))); + assert!(!resolve_confirmed_reads_default(WsVersion::V3, Some(false))); assert!(!resolve_confirmed_reads_default(WsVersion::V2, Some(false))); } diff --git a/crates/core/src/client.rs b/crates/core/src/client.rs index cad4f79adcf..4411192c625 100644 --- a/crates/core/src/client.rs +++ b/crates/core/src/client.rs @@ -7,6 +7,7 @@ pub mod consume_each_list; mod message_handlers; mod message_handlers_v1; mod message_handlers_v2; +mod message_handlers_v3; pub mod messages; pub use client_connection::{ diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 6fb8d8e1623..0a7a7f1a11b 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -47,6 +47,7 @@ pub enum Protocol { pub enum WsVersion { V1, V2, + V3, } impl Protocol { @@ -384,7 +385,7 @@ impl ClientConnectionSender { debug_assert!( matches!( (&self.config.version, &message), - (WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2, OutboundMessage::V2(_)) + (WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2 | WsVersion::V3, OutboundMessage::V2(_)) ), "attempted to send message variant that does not match client websocket version" ); diff --git a/crates/core/src/client/message_handlers.rs b/crates/core/src/client/message_handlers.rs index 76f5fa53afa..fb85730c11c 100644 --- a/crates/core/src/client/message_handlers.rs +++ b/crates/core/src/client/message_handlers.rs @@ -23,5 +23,6 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst match client.config.version { WsVersion::V1 => super::message_handlers_v1::handle(client, message, timer).await, WsVersion::V2 => super::message_handlers_v2::handle(client, message, timer).await, + WsVersion::V3 => super::message_handlers_v3::handle(client, message, timer).await, } } diff --git a/crates/core/src/client/message_handlers_v2.rs b/crates/core/src/client/message_handlers_v2.rs index 5dd2f80d01b..2db523e472d 100644 --- a/crates/core/src/client/message_handlers_v2.rs +++ b/crates/core/src/client/message_handlers_v2.rs @@ -20,6 +20,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst ))) } }; + handle_decoded_message(client, message, timer).await +} + +pub(super) async fn handle_decoded_message( + client: &ClientConnection, + message: ws_v2::ClientMessage, + timer: Instant, +) -> Result<(), MessageHandleError> { let module = client.module(); let mod_info = module.info(); let mod_metrics = &mod_info.metrics; diff --git a/crates/core/src/client/message_handlers_v3.rs b/crates/core/src/client/message_handlers_v3.rs new file mode 100644 index 00000000000..2d0103cc2a3 --- /dev/null +++ b/crates/core/src/client/message_handlers_v3.rs @@ -0,0 +1,34 @@ +use super::{ClientConnection, DataMessage, MessageHandleError}; +use serde::de::Error as _; +use spacetimedb_lib::bsatn; +use std::time::Instant; + +const EMPTY_V3_PAYLOAD_ERR: &str = "v3 websocket binary payload must contain at least one v2 client message"; + +pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> { + client.observe_websocket_request_message(&message); + match message { + DataMessage::Binary(message_buf) => { + let mut remaining = &message_buf[..]; + + if remaining.is_empty() { + return Err(bsatn::DecodeError::Other(EMPTY_V3_PAYLOAD_ERR.into()).into()); + } + + loop { + let message = bsatn::from_reader(&mut remaining)?; + super::message_handlers_v2::handle_decoded_message(client, message, timer).await?; + if remaining.is_empty() { + break; + } + } + } + DataMessage::Text(_) => { + return Err(MessageHandleError::TextDecode(serde_json::Error::custom( + "v3 websocket does not support text messages", + ))) + } + } + + Ok(()) +} diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index ed65e092d0e..b8f1cc87e39 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -97,6 +97,25 @@ impl SerializeBuffer { } } +/// Finalize a binary websocket payload by optionally compressing the serialized +/// bytes after the caller has written the protocol-specific payload body. +/// +/// Callers are responsible for writing the compression tag before invoking this +/// helper. +fn finalize_binary_serialize_buffer( + buffer: SerializeBuffer, + uncompressed_len: usize, + compression: ws_v1::Compression, +) -> (InUseSerializeBuffer, Bytes) { + match decide_compression(uncompressed_len, compression) { + ws_v1::Compression::None => buffer.uncompressed(), + ws_v1::Compression::Brotli => { + buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress) + } + ws_v1::Compression::Gzip => buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress), + } +} + type BytesMutWriter<'a> = bytes::buf::Writer<&'a mut BytesMut>; pub enum InUseSerializeBuffer { @@ -159,28 +178,20 @@ pub fn serialize( let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| { bsatn::to_writer(w.into_inner(), &msg).unwrap() }); + let srv_msg_len = srv_msg.len(); // At this point, we no longer have a use for `msg`, // so try to reclaim its buffers. msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); // Conditionally compress the message. - let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) { - ws_v1::Compression::None => buffer.uncompressed(), - ws_v1::Compression::Brotli => { - buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress) - } - ws_v1::Compression::Gzip => { - buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress) - } - }; + let (in_use, msg_bytes) = finalize_binary_serialize_buffer(buffer, srv_msg_len, config.compression); (in_use, msg_bytes.into()) } } } /// Serialize `msg` into a [`DataMessage`] containing a [`ws_v2::ServerMessage`]. -/// /// This mirrors the v1 framing by prepending the compression tag and applying /// conditional compression when configured. pub fn serialize_v2( @@ -192,18 +203,13 @@ pub fn serialize_v2( let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| { bsatn::to_writer(w.into_inner(), &msg).expect("should be able to bsatn encode v2 message"); }); + let srv_msg_len = srv_msg.len(); // At this point, we no longer have a use for `msg`, // so try to reclaim its buffers. msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); - match decide_compression(srv_msg.len(), compression) { - ws_v1::Compression::None => buffer.uncompressed(), - ws_v1::Compression::Brotli => { - buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress) - } - ws_v1::Compression::Gzip => buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress), - } + finalize_binary_serialize_buffer(buffer, srv_msg_len, compression) } #[derive(Debug, From)] diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 301bbc47767..763bfeb1676 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1639,7 +1639,7 @@ impl ModuleSubscriptions { message, ); } - WsVersion::V2 => { + WsVersion::V2 | WsVersion::V3 => { if let Some(request_id) = event.request_id { self.send_reducer_failure_result_v2(client, &event, request_id); }