Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 16 additions & 34 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRo
use crate::subscription::tx::DeltaTx;
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource};
use crate::subscription::{execute_plan, execute_plan_for_view};
use crate::util::jobs::{AllocatedJobCore, SingleCoreExecutor, SingleThreadedExecutor};
use crate::util::jobs::{AllocatedJobCore, SingleThreadedExecutor};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::Context;
use bytes::Bytes;
Expand Down Expand Up @@ -342,8 +342,7 @@ pub enum ModuleWithInstance {
Wasm {
module: super::wasmtime::Module,
procedure_module: super::wasmtime::ProcedureModule,
main_thread_name: String,
procedure_thread_name: String,
thread_name: String,
core: AllocatedJobCore,
init_inst: Box<super::wasmtime::ModuleInstance>,
procedure_instance_pool_size: NonZeroUsize,
Expand Down Expand Up @@ -407,9 +406,8 @@ impl WasmtimeModuleState {
}
}

/// Wasm uses a single-core executor backed by a Tokio single threaded runtime
/// for async procedures. It uses an executor backed by a single OS-thread for
/// everything else.
/// Wasm uses a single executor backed by a single OS thread with a Tokio LocalSet
/// for async procedures; synchronous reducers run inline on the same thread.
///
/// Note, procedures acquire a module instance from the async procedure pool
/// before being enqueued by the executor.
Expand All @@ -418,8 +416,7 @@ impl WasmtimeModuleState {
/// to acquire.
struct WasmtimeModuleHost {
module: Arc<super::wasmtime::Module>,
main_executor: SingleThreadedExecutor<WasmtimeModuleState>,
procedure_executor: SingleCoreExecutor,
executor: SingleThreadedExecutor<WasmtimeModuleState>,
procedure_instances: Arc<WasmtimeProcedureInstanceManager>,
}

Expand All @@ -435,7 +432,7 @@ impl WasmtimeModuleHost {
A: Send + 'static,
{
let label = label.to_owned();
self.main_executor.enqueue_job(move |state| {
self.executor.enqueue_sync_job(move |state| {
scopeguard::defer_on_unwind!({
log::warn!("wasm main operation {label} panicked");
on_panic();
Expand All @@ -461,7 +458,7 @@ impl WasmtimeModuleHost {
let instance_manager = self.procedure_instances.clone();
let ModuleInstanceLease { instance, slot } = instance_manager.get_instance().await;
let label = label.to_owned();
self.procedure_executor.enqueue_job(async move || {
self.executor.enqueue_async_job(async move || {
scopeguard::defer_on_unwind!({
log::warn!("wasm procedure {label} panicked");
on_panic();
Expand Down Expand Up @@ -1709,8 +1706,7 @@ impl ModuleHost {
ModuleWithInstance::Wasm {
module,
procedure_module,
main_thread_name,
procedure_thread_name,
thread_name,
core,
init_inst,
procedure_instance_pool_size,
Expand All @@ -1721,20 +1717,7 @@ impl ModuleHost {
let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity);
let main_state = WasmtimeModuleState::new(module.clone(), init_inst, metrics.clone());

// The wasm main and procedure executors run on separate OS threads,
// but they intentionally share one database core allocation.
// When core pinning is enabled, both threads pin to the same core
// and rebalance together because they use clones of the same `CorePinner`.
let (load_balance_guard, core_pinner) = core.into_shared();

let main_executor = AllocatedJobCore::spawn_executor(
load_balance_guard.clone(),
core_pinner.clone(),
main_state,
main_thread_name,
);
let procedure_executor =
AllocatedJobCore::spawn_async_executor(load_balance_guard, core_pinner, procedure_thread_name);
let executor = core.spawn_executor(main_state, thread_name);
let procedure_instances = Arc::new(ModuleInstanceManager::new_bounded_with_metrics(
procedure_module,
None,
Expand All @@ -1743,8 +1726,7 @@ impl ModuleHost {
));
Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost {
module,
main_executor,
procedure_executor,
executor,
procedure_instances,
})))
}
Expand Down Expand Up @@ -1853,9 +1835,9 @@ impl ModuleHost {

Ok(match &*self.inner {
ModuleHostInner::Wasm(host) => {
let executor = host.main_executor.clone();
let executor = host.executor.clone();
executor
.run_job(move |state| {
.run_sync_job(move |state| {
state.with_instance(move |inst| {
drop(timer_guard);
wasm(arg, inst)
Expand Down Expand Up @@ -1897,12 +1879,12 @@ impl ModuleHost {

Ok(match &*self.inner {
ModuleHostInner::Wasm(host) => {
let executor = host.procedure_executor.clone();
let executor = host.executor.clone();
let instance_manager = host.procedure_instances.clone();
instance_manager
.with_instance(async move |mut inst| {
executor
.run_job(async move || {
.run_async_job(async move || {
drop(timer_guard);
let res = wasm(arg, &mut inst).await;
(res, inst)
Expand Down Expand Up @@ -3202,10 +3184,10 @@ impl ModuleHost {
request,
|request, inst, on_panic| async move { inst.enqueue_one_off_query(request, on_panic).await },
move |request, wasm_host, on_panic, timer_guard| {
let executor = wasm_host.main_executor.clone();
let executor = wasm_host.executor.clone();
let info = wasm_host.module.info();
let label = label.to_owned();
executor.enqueue_job(move |_| {
executor.enqueue_sync_job(move |_| {
scopeguard::defer_on_unwind!({
log::warn!("websocket one-off query operation {label} panicked");
on_panic();
Expand Down
21 changes: 6 additions & 15 deletions crates/core/src/host/wasmtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,14 @@ pub type ProcedureModule = WasmModuleHostActor<WasmtimeAsyncModule>;
pub type ModuleInstance = WasmModuleInstance<WasmtimeInstance>;

// Linux thread names expose at most 15 bytes, so keep the database identity
// suffix short enough to survive after the `wasm-main-`/`wasm-proc-` prefix.
const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 5;
// suffix short enough to survive after the `wasm-` prefix.
const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 10;

fn wasm_main_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
fn wasm_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
let hex = database_identity.to_hex();
// We use the tail of the identity to avoid the common structured prefix.
let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..];
format!("wasm-main-{suffix}")
}

fn wasm_procedure_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
let hex = database_identity.to_hex();
// We use the tail of the identity to avoid the common structured prefix.
let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..];
format!("wasm-proc-{suffix}")
format!("wasm-{suffix}")
}

impl WasmtimeRuntime {
Expand Down Expand Up @@ -182,16 +175,14 @@ impl WasmtimeRuntime {

let module = WasmtimeModule::new(module);
let procedure_module = WasmtimeAsyncModule::new(procedure_module);
let main_thread_name = wasm_main_worker_thread_name(&mcc.replica_ctx.database_identity);
let procedure_thread_name = wasm_procedure_executor_thread_name(&mcc.replica_ctx.database_identity);
let thread_name = wasm_worker_thread_name(&mcc.replica_ctx.database_identity);

let (module, init_inst) = WasmModuleHostActor::new(mcc, module)?;
let procedure_module = module.with_runtime_module(procedure_module)?;
Ok(super::module_host::ModuleWithInstance::Wasm {
module,
procedure_module,
main_thread_name,
procedure_thread_name,
thread_name,
core,
init_inst: Box::new(init_inst),
procedure_instance_pool_size: self.config.procedure_instance_pool_size,
Expand Down
Loading
Loading