Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/client-api-messages/DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ spacetime generate -p spacetimedb-cli --lang <SDK lang> \
--out-dir <sdk WebSocket schema bindings dir> \
--module-def ws_schema_v2.json
```

Note, the v3 WebSocket protocol does not have a separate schema.
It reuses the v2 message schema and only changes the websocket binary framing.
In v2 for example, a WebSocket frame contained a single BSATN-encoded v2 message,
but in v3, a single WebSocket frame may contain a batch of one or more v2 messages.
1 change: 1 addition & 0 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
pub mod common;
pub mod v1;
pub mod v2;
pub mod v3;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you prefer adding a new version with a wrapper, rather than adding new message variants to v2?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly because this new version does not represent a change in the message format/schema, but rather message transport/framing. I wanted to keep all of the message handlers for v2 essentially unchanged, and I thought this was the most maintainable way to split it.

13 changes: 13 additions & 0 deletions crates/client-api-messages/src/websocket/v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! Binary framing for websocket protocol v3.
//!
//! Unlike v2, v3 does not define a new outer message schema.
//! A single binary websocket payload contains one or more BSATN-encoded
//! [`crate::websocket::v2::ClientMessage`] values from client to server,
//! or one or more consecutive BSATN-encoded [`crate::websocket::v2::ServerMessage`]
//! values from server to client.
//!
//! Client and server may coalesce multiple messages into one websocket payload,
//! or send them separately, regardless of what the other one does,
//! so long as logical order is preserved.

pub const BIN_PROTOCOL: &str = "v3.bsatn.spacetimedb";
104 changes: 69 additions & 35 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use spacetimedb::worker_metrics::WORKER_METRICS;
use spacetimedb::Identity;
use spacetimedb_client_api_messages::websocket::v1 as ws_v1;
use spacetimedb_client_api_messages::websocket::v2 as ws_v2;
use spacetimedb_client_api_messages::websocket::v3 as ws_v3;
use spacetimedb_datastore::execution_context::WorkloadType;
use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
use tokio::sync::{mpsc, watch};
Expand All @@ -62,6 +63,8 @@ pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::TEXT_PROT
pub const BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::BIN_PROTOCOL);
#[allow(clippy::declare_interior_mutable_const)]
pub const V2_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v2::BIN_PROTOCOL);
#[allow(clippy::declare_interior_mutable_const)]
pub const V3_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v3::BIN_PROTOCOL);

pub trait HasWebSocketOptions {
fn websocket_options(&self) -> WebSocketOptions;
Expand Down Expand Up @@ -101,7 +104,7 @@ fn resolve_confirmed_reads_default(version: WsVersion, confirmed: Option<bool>)
}
match version {
WsVersion::V1 => false,
WsVersion::V2 => crate::DEFAULT_CONFIRMED_READS,
WsVersion::V2 | WsVersion::V3 => crate::DEFAULT_CONFIRMED_READS,
}
}

Expand Down Expand Up @@ -151,6 +154,13 @@ where
}

let (res, ws_upgrade, protocol) = ws.select_protocol([
(
V3_BIN_PROTOCOL,
NegotiatedProtocol {
protocol: Protocol::Binary,
version: WsVersion::V3,
},
),
(
V2_BIN_PROTOCOL,
NegotiatedProtocol {
Expand Down Expand Up @@ -284,7 +294,7 @@ where
};
client.send_message(None, OutboundMessage::V1(message.into()))
}
WsVersion::V2 => {
WsVersion::V2 | WsVersion::V3 => {
let message = ws_v2::ServerMessage::InitialConnection(ws_v2::InitialConnection {
identity: client_identity,
connection_id,
Expand Down Expand Up @@ -1296,7 +1306,7 @@ async fn ws_encode_task(
let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY);
let mut in_use_bufs: Vec<ScopeGuard<InUseSerializeBuffer, _>> = Vec::with_capacity(BUF_POOL_CAPACITY);

while let Some(message) = messages.recv().await {
'send: while let Some(message) = messages.recv().await {
// Drop serialize buffers with no external referent,
// returning them to the pool.
in_use_bufs.retain(|in_use| !in_use.is_unique());
Expand All @@ -1306,55 +1316,62 @@ async fn ws_encode_task(

let in_use_buf = match message {
OutboundWsMessage::Error(message) => {
if config.version == WsVersion::V2 {
log::error!("dropping v1 error message sent to a v2 client: {:?}", message);
if config.version != WsVersion::V1 {
log::error!(
"dropping v1 error message sent to a binary websocket client: {:?}",
message
);
continue;
}
let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await;
metrics.report(None, None, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
}

let Ok(in_use) = ws_forward_frames(
&metrics,
&outgoing_frames,
None,
None,
ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await,
) else {
break 'send;
};
in_use
}
OutboundWsMessage::Message(message) => {
let workload = message.workload();
let num_rows = message.num_rows();
match message {
OutboundMessage::V2(server_message) => {
if config.version != WsVersion::V2 {
if config.version == WsVersion::V1 {
log::error!("dropping v2 message on v1 connection");
continue;
}

let (stats, in_use, mut frames) =
ws_encode_message_v2(config, buf, server_message, false, &bsatn_rlb_pool).await;
metrics.report(workload, num_rows, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
}

let Ok(in_use) = ws_forward_frames(
&metrics,
&outgoing_frames,
workload,
num_rows,
ws_encode_binary_message(config, buf, server_message, false, &bsatn_rlb_pool).await,
) else {
break 'send;
};
in_use
}
OutboundMessage::V1(message) => {
if config.version == WsVersion::V2 {
log::error!(
"dropping v1 message for v2 connection until v2 serialization is implemented: {:?}",
message
);
if config.version != WsVersion::V1 {
log::error!("dropping v1 message for a binary websocket connection: {:?}", message);
continue;
}

let is_large = num_rows.is_some_and(|n| n > 1024);

let (stats, in_use, mut frames) =
ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await;
metrics.report(workload, num_rows, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
}

let Ok(in_use) = ws_forward_frames(
&metrics,
&outgoing_frames,
workload,
num_rows,
ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await,
) else {
break 'send;
};
in_use
}
}
Expand All @@ -1370,6 +1387,21 @@ async fn ws_encode_task(
}
}

/// Reports encode metrics for an already-encoded message and forwards its
/// frames to the websocket send task.
fn ws_forward_frames(
metrics: &SendMetrics,
outgoing_frames: &mpsc::UnboundedSender<Frame>,
workload: Option<WorkloadType>,
num_rows: Option<usize>,
encoded: (EncodeMetrics, InUseSerializeBuffer, impl IntoIterator<Item = Frame>),
) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>> {
let (stats, in_use, frames) = encoded;
metrics.report(workload, num_rows, stats);
frames.into_iter().try_for_each(|frame| outgoing_frames.send(frame))?;
Ok(in_use)
}

/// Some stats about serialization and compression.
///
/// Returned by [`ws_encode_message`].
Expand Down Expand Up @@ -1443,21 +1475,21 @@ async fn ws_encode_message(
(metrics, msg_alloc, frames)
}

#[allow(dead_code, unused_variables)]
async fn ws_encode_message_v2(
async fn ws_encode_binary_message(
config: ClientConfig,
buf: SerializeBuffer,
message: ws_v2::ServerMessage,
is_large_message: bool,
bsatn_rlb_pool: &BsatnRowListBuilderPool,
) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator<Item = Frame> + use<>) {
let start = Instant::now();
let compression = config.compression;

let (in_use, data) = if is_large_message {
let bsatn_rlb_pool = bsatn_rlb_pool.clone();
spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, config.compression)).await
spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, compression)).await
} else {
serialize_v2(bsatn_rlb_pool, buf, message, config.compression)
serialize_v2(bsatn_rlb_pool, buf, message, compression)
};

let metrics = EncodeMetrics {
Expand Down Expand Up @@ -2298,9 +2330,11 @@ mod tests {

#[test]
fn confirmed_reads_default_depends_on_ws_version() {
assert!(resolve_confirmed_reads_default(WsVersion::V3, None));
assert!(resolve_confirmed_reads_default(WsVersion::V2, None));
assert!(!resolve_confirmed_reads_default(WsVersion::V1, None));
assert!(resolve_confirmed_reads_default(WsVersion::V1, Some(true)));
assert!(!resolve_confirmed_reads_default(WsVersion::V3, Some(false)));
assert!(!resolve_confirmed_reads_default(WsVersion::V2, Some(false)));
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod consume_each_list;
mod message_handlers;
mod message_handlers_v1;
mod message_handlers_v2;
mod message_handlers_v3;
pub mod messages;

pub use client_connection::{
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub enum Protocol {
pub enum WsVersion {
V1,
V2,
V3,
}

impl Protocol {
Expand Down Expand Up @@ -384,7 +385,7 @@ impl ClientConnectionSender {
debug_assert!(
matches!(
(&self.config.version, &message),
(WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2, OutboundMessage::V2(_))
(WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2 | WsVersion::V3, OutboundMessage::V2(_))
),
"attempted to send message variant that does not match client websocket version"
);
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
match client.config.version {
WsVersion::V1 => super::message_handlers_v1::handle(client, message, timer).await,
WsVersion::V2 => super::message_handlers_v2::handle(client, message, timer).await,
WsVersion::V3 => super::message_handlers_v3::handle(client, message, timer).await,
}
}
8 changes: 8 additions & 0 deletions crates/core/src/client/message_handlers_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
)))
}
};
handle_decoded_message(client, message, timer).await
}

pub(super) async fn handle_decoded_message(
client: &ClientConnection,
message: ws_v2::ClientMessage,
timer: Instant,
) -> Result<(), MessageHandleError> {
let module = client.module();
let mod_info = module.info();
let mod_metrics = &mod_info.metrics;
Expand Down
38 changes: 38 additions & 0 deletions crates/core/src/client/message_handlers_v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use super::{ClientConnection, DataMessage, MessageHandleError};
use serde::de::Error as _;
use spacetimedb_client_api_messages::websocket::v2 as ws_v2;
use spacetimedb_lib::bsatn;
use std::time::Instant;

const EMPTY_V3_PAYLOAD_ERR: &str = "v3 websocket binary payload must contain at least one v2 client message";

fn decode_one_v3_message(remaining: &mut &[u8]) -> Result<ws_v2::ClientMessage, MessageHandleError> {
if remaining.is_empty() {
return Err(bsatn::DecodeError::Other(EMPTY_V3_PAYLOAD_ERR.into()).into());
}
Ok(bsatn::from_reader(remaining)?)
}

pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> {
client.observe_websocket_request_message(&message);
match message {
DataMessage::Binary(message_buf) => {
let mut remaining = &message_buf[..];

loop {
let message = decode_one_v3_message(&mut remaining)?;
super::message_handlers_v2::handle_decoded_message(client, message, timer).await?;
if remaining.is_empty() {
break;
}
}
}
DataMessage::Text(_) => {
return Err(MessageHandleError::TextDecode(serde_json::Error::custom(
"v3 websocket does not support text messages",
)))
}
}

Ok(())
}
Loading
Loading