From 35f127a6631209e624f934a040412ab482b58090 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Mon, 1 Jun 2026 21:38:09 +0530 Subject: [PATCH 1/4] megre SingleCoreExecutor & SingleThreadedExecutor into one --- crates/core/src/host/module_host.rs | 49 +++----- crates/core/src/host/wasmtime/mod.rs | 13 +- crates/core/src/util/jobs.rs | 176 +++++++++------------------ 3 files changed, 77 insertions(+), 161 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 70310d7baec..9b86fa055f1 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -29,7 +29,7 @@ use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRo use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; use crate::subscription::{execute_plan, execute_plan_for_view}; -use crate::util::jobs::{AllocatedJobCore, SingleCoreExecutor, SingleThreadedExecutor}; +use crate::util::jobs::{AllocatedJobCore, SingleCoreExecutor}; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; use bytes::Bytes; @@ -343,8 +343,7 @@ pub enum ModuleWithInstance { Wasm { module: super::wasmtime::Module, procedure_module: super::wasmtime::ProcedureModule, - main_thread_name: String, - procedure_thread_name: String, + thread_name: String, core: AllocatedJobCore, init_inst: Box, procedure_instance_pool_size: NonZeroUsize, @@ -408,9 +407,8 @@ impl WasmtimeModuleState { } } -/// Wasm uses a single-core executor backed by a Tokio single threaded runtime -/// for async procedures. It uses an executor backed by a single OS-thread for -/// everything else. +/// Wasm uses a single executor backed by a single OS thread with a Tokio LocalSet +/// for async procedures; synchronous reducers run inline on the same thread. /// /// Note, procedures acquire a module instance from the async procedure pool /// before being enqueued by the executor. @@ -419,8 +417,7 @@ impl WasmtimeModuleState { /// to acquire. struct WasmtimeModuleHost { module: Arc, - main_executor: SingleThreadedExecutor, - procedure_executor: SingleCoreExecutor, + executor: SingleCoreExecutor, procedure_instances: Arc, } @@ -436,7 +433,7 @@ impl WasmtimeModuleHost { A: Send + 'static, { let label = label.to_owned(); - self.main_executor.enqueue_job(move |state| { + self.executor.enqueue_sync_job(move |state| { scopeguard::defer_on_unwind!({ log::warn!("wasm main operation {label} panicked"); on_panic(); @@ -462,7 +459,7 @@ impl WasmtimeModuleHost { let instance_manager = self.procedure_instances.clone(); let ModuleInstanceLease { instance, slot } = instance_manager.get_instance().await; let label = label.to_owned(); - self.procedure_executor.enqueue_job(async move || { + self.executor.enqueue_async_job(async move || { scopeguard::defer_on_unwind!({ log::warn!("wasm procedure {label} panicked"); on_panic(); @@ -1710,8 +1707,7 @@ impl ModuleHost { ModuleWithInstance::Wasm { module, procedure_module, - main_thread_name, - procedure_thread_name, + thread_name, core, init_inst, procedure_instance_pool_size, @@ -1722,20 +1718,10 @@ impl ModuleHost { let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); let main_state = WasmtimeModuleState::new(module.clone(), init_inst, metrics.clone()); - // The wasm main and procedure executors run on separate OS threads, - // but they intentionally share one database core allocation. - // When core pinning is enabled, both threads pin to the same core - // and rebalance together because they use clones of the same `CorePinner`. let (load_balance_guard, core_pinner) = core.into_shared(); - let main_executor = AllocatedJobCore::spawn_executor( - load_balance_guard.clone(), - core_pinner.clone(), - main_state, - main_thread_name, - ); - let procedure_executor = - AllocatedJobCore::spawn_async_executor(load_balance_guard, core_pinner, procedure_thread_name); + let executor = + AllocatedJobCore::spawn_executor(load_balance_guard, core_pinner, main_state, thread_name); let procedure_instances = Arc::new(ModuleInstanceManager::new_bounded_with_metrics( procedure_module, None, @@ -1744,8 +1730,7 @@ impl ModuleHost { )); Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost { module, - main_executor, - procedure_executor, + executor, procedure_instances, }))) } @@ -1854,9 +1839,9 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - let executor = host.main_executor.clone(); + let executor = host.executor.clone(); executor - .run_job(move |state| { + .run_sync_job(move |state| { state.with_instance(move |inst| { drop(timer_guard); wasm(arg, inst) @@ -1898,12 +1883,12 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - let executor = host.procedure_executor.clone(); + let executor = host.executor.clone(); let instance_manager = host.procedure_instances.clone(); instance_manager .with_instance(async move |mut inst| { executor - .run_job(async move || { + .run_async_job(async move || { drop(timer_guard); let res = wasm(arg, &mut inst).await; (res, inst) @@ -3203,10 +3188,10 @@ impl ModuleHost { request, |request, inst, on_panic| async move { inst.enqueue_one_off_query(request, on_panic).await }, move |request, wasm_host, on_panic, timer_guard| { - let executor = wasm_host.main_executor.clone(); + let executor = wasm_host.executor.clone(); let info = wasm_host.module.info(); let label = label.to_owned(); - executor.enqueue_job(move |_| { + executor.enqueue_sync_job(move |_| { scopeguard::defer_on_unwind!({ log::warn!("websocket one-off query operation {label} panicked"); on_panic(); diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 45eaaab56fc..9950071e172 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -145,13 +145,6 @@ fn wasm_main_worker_thread_name(database_identity: &spacetimedb_lib::Identity) - format!("wasm-main-{suffix}") } -fn wasm_procedure_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String { - let hex = database_identity.to_hex(); - // We use the tail of the identity to avoid the common structured prefix. - let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..]; - format!("wasm-proc-{suffix}") -} - impl WasmtimeRuntime { pub fn make_actor( &self, @@ -182,16 +175,14 @@ impl WasmtimeRuntime { let module = WasmtimeModule::new(module); let procedure_module = WasmtimeAsyncModule::new(procedure_module); - let main_thread_name = wasm_main_worker_thread_name(&mcc.replica_ctx.database_identity); - let procedure_thread_name = wasm_procedure_executor_thread_name(&mcc.replica_ctx.database_identity); + let thread_name = wasm_main_worker_thread_name(&mcc.replica_ctx.database_identity); let (module, init_inst) = WasmModuleHostActor::new(mcc, module)?; let procedure_module = module.with_runtime_module(procedure_module)?; Ok(super::module_host::ModuleWithInstance::Wasm { module, procedure_module, - main_thread_name, - procedure_thread_name, + thread_name, core, init_inst: Box::new(init_inst), procedure_instance_pool_size: self.config.procedure_instance_pool_size, diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 89c18ee5242..3242ccb0f72 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -1,5 +1,5 @@ use std::panic::AssertUnwindSafe; -use std::sync::{mpsc as std_mpsc, Arc, Mutex, Weak}; +use std::sync::{Arc, Mutex, Weak}; use core_affinity::CoreId; use futures::future::LocalBoxFuture; @@ -16,7 +16,7 @@ use crate::util::thread_scheduling::apply_compute_thread_hint; /// A handle to a pool of Tokio executors for running database WASM code on. /// /// Each database has a [`SingleCoreExecutor`], -/// a handle to a single-threaded Tokio runtime which is pinned to a specific CPU core. +/// a handle to a single OS thread with a Tokio LocalSet which is pinned to a specific CPU core. /// In multi-tenant environments, multiple databases' [`SingleCoreExecutor`]s may be handles on the same runtime/core, /// and a [`SingleCoreExecutor`] may occasionally be migrated to a different runtime/core to balance load. /// @@ -224,22 +224,13 @@ impl AllocatedJobCore { } /// Spawn a [`SingleCoreExecutor`] allocated to this core. - pub fn spawn_async_executor( - guard: Arc, - pinner: CorePinner, - name: impl Into, - ) -> SingleCoreExecutor { - SingleCoreExecutor::spawn_and_pin(guard, pinner, Some(name.into())) - } - - /// Spawn a [`SingleThreadedExecutor`] allocated to this core. pub fn spawn_executor( guard: Arc, pinner: CorePinner, state: S, name: impl Into, - ) -> SingleThreadedExecutor { - SingleThreadedExecutor::spawn_and_pin(guard, pinner, state, Some(name.into())) + ) -> SingleCoreExecutor { + SingleCoreExecutor::spawn_and_pin(guard, pinner, state, Some(name.into())) } } @@ -291,7 +282,12 @@ impl CorePinner { } } -/// A handle to a Tokio executor which can be used to run WASM compute for a particular database. +enum ExecutorJob { + Async(Box LocalBoxFuture<'static, ()> + Send>), + Sync(Box), +} + +/// A handle to a single-threaded executor for database work. /// /// Use [`Self::run_job`] to run futures, and [`Self::run_sync_job`] to run functions. /// @@ -299,25 +295,31 @@ impl CorePinner { /// When all handles on this database executor have been dropped, /// its use of the core to which it is pinned will be released, /// and other databases may be migrated to that core to balance load. -#[derive(Clone)] -pub struct SingleCoreExecutor { - inner: Arc, +pub struct SingleCoreExecutor { + inner: Arc>, } -struct SingleCoreExecutorInner { - /// The sending end of a channel over which we send jobs. - job_tx: mpsc::UnboundedSender LocalBoxFuture<'static, ()> + Send>>, +impl Clone for SingleCoreExecutor { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } } -impl SingleCoreExecutor { - /// Spawn a `SingleCoreExecutor` on the given core. - fn spawn(core: AllocatedJobCore, name: Option) -> Self { - let (guard, pinner) = core.into_shared(); - Self::spawn_and_pin(guard, pinner, name) - } +struct SingleCoreExecutorInner { + /// The sending end of a channel over which we send jobs. + job_tx: mpsc::UnboundedSender>, +} - fn spawn_and_pin(guard: Arc, mut pinner: CorePinner, name: Option) -> Self { - let (job_tx, mut job_rx) = mpsc::unbounded_channel(); +impl SingleCoreExecutor { + fn spawn_and_pin( + guard: Arc, + mut pinner: CorePinner, + mut state: S, + name: Option, + ) -> Self { + let (job_tx, mut job_rx) = mpsc::unbounded_channel::>(); let inner = Arc::new(SingleCoreExecutorInner { job_tx }); @@ -333,9 +335,21 @@ impl SingleCoreExecutor { let _entered = rt.enter(); let local = tokio::task::LocalSet::new(); + // Clone the pinner: one for inline repinning in the sync job handler, + // one for the background re-pinning watcher. + let mut loop_pinner = pinner.clone(); + let job_loop = async { while let Some(job) = job_rx.recv().await { - local.spawn_local(job()); + match job { + ExecutorJob::Async(job) => { + local.spawn_local(job()); + } + ExecutorJob::Sync(job) => { + loop_pinner.pin_if_changed(); + job(&mut state); + } + } } }; @@ -353,18 +367,8 @@ impl SingleCoreExecutor { Self { inner } } - /// Create a `SingleCoreExecutor` which runs jobs in [`tokio::runtime::Handle::current`]. - /// - /// Callers should most likely instead construct a `SingleCoreExecutor` via [`JobCores::take`], - /// which will intelligently pin each database to a particular core. - /// This method should only be used for short-lived instances which do not perform intense computation, - /// e.g. to extract the schema by calling `describe_module`. - pub fn in_current_tokio_runtime() -> Self { - Self::spawn(AllocatedJobCore::default(), None) - } - - /// Run a job for this database executor. - pub async fn run_job(&self, f: F) -> R + /// Run an async job on this executor and return its result. + pub async fn run_async_job(&self, f: F) -> R where F: AsyncFnOnce() -> R + Send + 'static, R: Send + 'static, @@ -374,7 +378,7 @@ impl SingleCoreExecutor { self.inner .job_tx - .send(Box::new(move || { + .send(ExecutorJob::Async(Box::new(move || { async move { let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; if let Err(Err(_panic)) = tx.send(result) { @@ -382,7 +386,7 @@ impl SingleCoreExecutor { } } .boxed_local() - })) + }))) .unwrap_or_else(|_| panic!("job thread exited")); match rx.await.unwrap() { @@ -392,7 +396,7 @@ impl SingleCoreExecutor { } /// Enqueue a job for this database executor without waiting for its result. - pub fn enqueue_job(&self, f: F) + pub fn enqueue_async_job(&self, f: F) where F: AsyncFnOnce() + Send + 'static, { @@ -400,83 +404,19 @@ impl SingleCoreExecutor { self.inner .job_tx - .send(Box::new(move || { + .send(ExecutorJob::Async(Box::new(move || { async move { if AssertUnwindSafe(f().instrument(span)).catch_unwind().await.is_err() { tracing::warn!("uncaught panic on `SingleCoreExecutor`") } } .boxed_local() - })) + }))) .unwrap_or_else(|_| panic!("job thread exited")); } /// Run `f` on this database executor and return its result. pub async fn run_sync_job(&self, f: F) -> R - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - self.run_job(async || f()).await - } -} - -/// A handle to a plain OS-thread executor for synchronous database work. -/// -/// Unlike [`SingleCoreExecutor`], it is intended for synchronous runtimes. -/// This executor never enters Tokio and never polls futures on its worker -/// thread. -pub struct SingleThreadedExecutor { - inner: Arc>, -} - -struct SingleThreadedExecutorInner { - job_tx: std_mpsc::Sender>, -} - -type SyncJob = Box; - -impl Clone for SingleThreadedExecutor { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } -} - -impl SingleThreadedExecutor { - fn spawn_and_pin( - guard: Arc, - mut pinner: CorePinner, - mut state: S, - name: Option, - ) -> Self { - let (job_tx, job_rx) = std_mpsc::channel::>(); - - let inner = Arc::new(SingleThreadedExecutorInner { job_tx }); - - let mut thread = std::thread::Builder::new(); - if let Some(name) = name { - thread = thread.name(name); - } - let worker = move || { - let _guard = guard; - pinner.pin_now(); - - while let Ok(job) = job_rx.recv() { - pinner.pin_if_changed(); - job(&mut state); - } - }; - thread - .spawn(worker) - .expect("failed to spawn thread for `SingleThreadedExecutor`"); - - Self { inner } - } - - /// Run `f` on this database executor and return its result. - pub async fn run_job(&self, f: F) -> R where F: FnOnce(&mut S) -> R + Send + 'static, R: Send + 'static, @@ -486,15 +426,15 @@ impl SingleThreadedExecutor { self.inner .job_tx - .send(Box::new(move |state| { + .send(ExecutorJob::Sync(Box::new(move |state| { let result = std::panic::catch_unwind(AssertUnwindSafe(|| { let _entered = span.enter(); f(state) })); if let Err(Err(_panic)) = tx.send(result) { - tracing::warn!("uncaught panic on `SingleThreadedExecutor`") + tracing::warn!("uncaught panic on `SingleCoreExecutor`") } - })) + }))) .unwrap_or_else(|_| panic!("job thread exited")); match rx.await.unwrap() { @@ -503,8 +443,8 @@ impl SingleThreadedExecutor { } } - /// Enqueue `f` without waiting for its result. - pub fn enqueue_job(&self, f: F) + /// Enqueue a job for this database executor without waiting for its result. + pub fn enqueue_sync_job(&self, f: F) where F: FnOnce(&mut S) + Send + 'static, { @@ -512,16 +452,16 @@ impl SingleThreadedExecutor { self.inner .job_tx - .send(Box::new(move |state| { + .send(ExecutorJob::Sync(Box::new(move |state| { if std::panic::catch_unwind(AssertUnwindSafe(|| { let _entered = span.enter(); f(state); })) .is_err() { - tracing::warn!("uncaught panic on `SingleThreadedExecutor`") + tracing::warn!("uncaught panic on `SingleCoreExecutor`") } - })) + }))) .unwrap_or_else(|_| panic!("job thread exited")); } } From 845b19f2c75e94a8040582661cad0df11aa9e63b Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Wed, 3 Jun 2026 12:06:09 +0530 Subject: [PATCH 2/4] SingleThreadExecutor --- crates/core/src/host/module_host.rs | 4 +- crates/core/src/util/jobs.rs | 60 ++++++++++++++--------------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 9b86fa055f1..030b5390f69 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -29,7 +29,7 @@ use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRo use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; use crate::subscription::{execute_plan, execute_plan_for_view}; -use crate::util::jobs::{AllocatedJobCore, SingleCoreExecutor}; +use crate::util::jobs::{AllocatedJobCore, SingleThreadedExecutor}; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; use bytes::Bytes; @@ -417,7 +417,7 @@ impl WasmtimeModuleState { /// to acquire. struct WasmtimeModuleHost { module: Arc, - executor: SingleCoreExecutor, + executor: SingleThreadedExecutor, procedure_instances: Arc, } diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 3242ccb0f72..a3cd90feb0e 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -15,10 +15,10 @@ use crate::util::thread_scheduling::apply_compute_thread_hint; /// A handle to a pool of Tokio executors for running database WASM code on. /// -/// Each database has a [`SingleCoreExecutor`], -/// a handle to a single OS thread with a Tokio LocalSet which is pinned to a specific CPU core. -/// In multi-tenant environments, multiple databases' [`SingleCoreExecutor`]s may be handles on the same runtime/core, -/// and a [`SingleCoreExecutor`] may occasionally be migrated to a different runtime/core to balance load. +/// Each database has a [`SingleThreadedExecutor`], +/// a handle to a single OS thread with a Tokio LocalSet. +/// In multi-tenant environments, multiple databases' [`SingleThreadedExecutor`]s may be handles on the same runtime/core, +/// and a [`SingleThreadedExecutor`] may occasionally be migrated to a different runtime/core to balance load. /// /// Construct a `JobCores` via [`Self::from_pinned_cores`] or [`Self::without_pinned_cores`]. /// A `JobCores` constructed without core pinning, including `from_pinned_cores` on an empty set, @@ -39,29 +39,29 @@ enum JobCoresInner { } struct PinnedCoresExecutorManager { - /// Channels to request that a [`SingleCoreExecutor`] move to a different core. + /// Channels to request that a [`SingleThreadedExecutor`] move to a different core. /// /// The [`CoreId`] that an executor is pinned to is used as an index into /// `self.cores` to make load-balancing decisions when freeing a database /// executor in [`Self::deallocate`]. - database_executor_move: HashMap>, + database_executor_move: HashMap>, cores: IndexMap, /// An index into `cores` of the next core to put a new job onto. /// /// This acts as a partition point in `cores`; all cores in `..index` have /// one fewer job on them than the cores in `index..`. next_core: usize, - next_id: SingleCoreExecutorId, + next_id: SingleThreadedExecutorId, } -/// Remembers the [`SingleCoreExecutorId`]s for all databases sharing that executor. +/// Remembers the [`SingleThreadedExecutorId`]s for all databases sharing that executor. #[derive(Default)] struct CoreInfo { - jobs: SmallVec<[SingleCoreExecutorId; 4]>, + jobs: SmallVec<[SingleThreadedExecutorId; 4]>, } #[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -struct SingleCoreExecutorId(usize); +struct SingleThreadedExecutorId(usize); impl JobCores { /// Get an [`AllocatedCore`] for a job thread. @@ -93,7 +93,7 @@ impl JobCores { database_executor_move: HashMap::default(), cores, next_core: 0, - next_id: SingleCoreExecutorId(0), + next_id: SingleThreadedExecutorId(0), }))) }; @@ -117,7 +117,7 @@ impl PinnedCoresExecutorManager { /// and store state in `self` necessary to move that database to a new core /// for load-balancing purposes. /// - /// The returned [`SingleCoreExecutorId`] is an index into internal data structures in `self` (namely, `self.cores`) + /// The returned [`SingleThreadedExecutorId`] is an index into internal data structures in `self` (namely, `self.cores`) /// which should be passed to [`Self::deallocate`] when the database is no longer using this executor. /// This is done automatically by [`LoadBalanceOnDropGuard`]. /// @@ -126,7 +126,7 @@ impl PinnedCoresExecutorManager { /// replaced to balance databases among available cores, so databases should /// either spawn [`CorePinner::run`] as a thread-local async task, or call /// [`CorePinner::pin_now`] frequently. - fn allocate(&mut self) -> (SingleCoreExecutorId, CorePinner) { + fn allocate(&mut self) -> (SingleThreadedExecutorId, CorePinner) { // Determine the next job ID. let database_executor_id = self.next_id; self.next_id.0 += 1; @@ -156,8 +156,8 @@ impl PinnedCoresExecutorManager { /// Mark the executor at `id` as no longer in use, free internal state which tracks it, /// and move other executors to different cores as necessary to maintain a balanced distribution. /// - /// Called by [`LoadBalanceOnDropGuard`] when a [`SingleCoreExecutor`] is no longer in use. - fn deallocate(&mut self, id: SingleCoreExecutorId) { + /// Called by [`LoadBalanceOnDropGuard`] when a [`SingleThreadedExecutor`] is no longer in use. + fn deallocate(&mut self, id: SingleThreadedExecutorId) { // Determine the `CoreId` that will now have one less job. // The `id`s came from `self.allocate()`, // so there must be a `database_executor_move` for it. @@ -223,14 +223,14 @@ impl AllocatedJobCore { (Arc::new(self.guard), self.pinner) } - /// Spawn a [`SingleCoreExecutor`] allocated to this core. + /// Spawn a [`SingleThreadedExecutor`] allocated to this core. pub fn spawn_executor( guard: Arc, pinner: CorePinner, state: S, name: impl Into, - ) -> SingleCoreExecutor { - SingleCoreExecutor::spawn_and_pin(guard, pinner, state, Some(name.into())) + ) -> SingleThreadedExecutor { + SingleThreadedExecutor::spawn_and_pin(guard, pinner, state, Some(name.into())) } } @@ -295,11 +295,11 @@ enum ExecutorJob { /// When all handles on this database executor have been dropped, /// its use of the core to which it is pinned will be released, /// and other databases may be migrated to that core to balance load. -pub struct SingleCoreExecutor { - inner: Arc>, +pub struct SingleThreadedExecutor { + inner: Arc>, } -impl Clone for SingleCoreExecutor { +impl Clone for SingleThreadedExecutor { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -307,12 +307,12 @@ impl Clone for SingleCoreExecutor { } } -struct SingleCoreExecutorInner { +struct SingleThreadedExecutorInner { /// The sending end of a channel over which we send jobs. job_tx: mpsc::UnboundedSender>, } -impl SingleCoreExecutor { +impl SingleThreadedExecutor { fn spawn_and_pin( guard: Arc, mut pinner: CorePinner, @@ -321,7 +321,7 @@ impl SingleCoreExecutor { ) -> Self { let (job_tx, mut job_rx) = mpsc::unbounded_channel::>(); - let inner = Arc::new(SingleCoreExecutorInner { job_tx }); + let inner = Arc::new(SingleThreadedExecutorInner { job_tx }); let rt = runtime::Handle::current(); let mut thread = std::thread::Builder::new(); @@ -362,7 +362,7 @@ impl SingleCoreExecutor { // dropped and cancelled. rt.block_on(local) }; - thread.spawn(worker).expect("failed to spawn SingleCoreExecutor thread"); + thread.spawn(worker).expect("failed to spawn SingleThreadedExecutor thread"); Self { inner } } @@ -382,7 +382,7 @@ impl SingleCoreExecutor { async move { let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; if let Err(Err(_panic)) = tx.send(result) { - tracing::warn!("uncaught panic on `SingleCoreExecutor`") + tracing::warn!("uncaught panic on `SingleThreadedExecutor`") } } .boxed_local() @@ -407,7 +407,7 @@ impl SingleCoreExecutor { .send(ExecutorJob::Async(Box::new(move || { async move { if AssertUnwindSafe(f().instrument(span)).catch_unwind().await.is_err() { - tracing::warn!("uncaught panic on `SingleCoreExecutor`") + tracing::warn!("uncaught panic on `SingleThreadedExecutor`") } } .boxed_local() @@ -432,7 +432,7 @@ impl SingleCoreExecutor { f(state) })); if let Err(Err(_panic)) = tx.send(result) { - tracing::warn!("uncaught panic on `SingleCoreExecutor`") + tracing::warn!("uncaught panic on `SingleThreadedExecutor`") } }))) .unwrap_or_else(|_| panic!("job thread exited")); @@ -459,7 +459,7 @@ impl SingleCoreExecutor { })) .is_err() { - tracing::warn!("uncaught panic on `SingleCoreExecutor`") + tracing::warn!("uncaught panic on `SingleThreadedExecutor`") } }))) .unwrap_or_else(|_| panic!("job thread exited")); @@ -470,7 +470,7 @@ impl SingleCoreExecutor { /// allowing databases from more-contended runtimes/cores to migrate there. #[derive(Default)] pub struct LoadBalanceOnDropGuard { - inner: Option<(Weak>, SingleCoreExecutorId)>, + inner: Option<(Weak>, SingleThreadedExecutorId)>, } impl Drop for LoadBalanceOnDropGuard { From d35fd0ac4fe2ddd40d1337c25bb0163cd42e2407 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Wed, 3 Jun 2026 12:16:41 +0530 Subject: [PATCH 3/4] address comments --- crates/core/src/host/module_host.rs | 4 +--- crates/core/src/host/wasmtime/mod.rs | 10 +++++----- crates/core/src/util/jobs.rs | 11 +++-------- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 030b5390f69..c6b432985b4 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1718,10 +1718,8 @@ impl ModuleHost { let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); let main_state = WasmtimeModuleState::new(module.clone(), init_inst, metrics.clone()); - let (load_balance_guard, core_pinner) = core.into_shared(); - let executor = - AllocatedJobCore::spawn_executor(load_balance_guard, core_pinner, main_state, thread_name); + core.spawn_executor(main_state, thread_name); let procedure_instances = Arc::new(ModuleInstanceManager::new_bounded_with_metrics( procedure_module, None, diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 9950071e172..2b0193f948a 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -135,14 +135,14 @@ pub type ProcedureModule = WasmModuleHostActor; pub type ModuleInstance = WasmModuleInstance; // Linux thread names expose at most 15 bytes, so keep the database identity -// suffix short enough to survive after the `wasm-main-`/`wasm-proc-` prefix. -const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 5; +// suffix short enough to survive after the `wasm-` prefix. +const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 10; -fn wasm_main_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String { +fn wasm_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String { let hex = database_identity.to_hex(); // We use the tail of the identity to avoid the common structured prefix. let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..]; - format!("wasm-main-{suffix}") + format!("wasm-{suffix}") } impl WasmtimeRuntime { @@ -175,7 +175,7 @@ impl WasmtimeRuntime { let module = WasmtimeModule::new(module); let procedure_module = WasmtimeAsyncModule::new(procedure_module); - let thread_name = wasm_main_worker_thread_name(&mcc.replica_ctx.database_identity); + let thread_name = wasm_worker_thread_name(&mcc.replica_ctx.database_identity); let (module, init_inst) = WasmModuleHostActor::new(mcc, module)?; let procedure_module = module.with_runtime_module(procedure_module)?; diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index a3cd90feb0e..ca4fe26665b 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -219,18 +219,13 @@ pub struct AllocatedJobCore { } impl AllocatedJobCore { - pub fn into_shared(self) -> (Arc, CorePinner) { - (Arc::new(self.guard), self.pinner) - } - - /// Spawn a [`SingleThreadedExecutor`] allocated to this core. + /// Spawn a [`SingleThreadedExecutor`] for this allocated core. pub fn spawn_executor( - guard: Arc, - pinner: CorePinner, + self, state: S, name: impl Into, ) -> SingleThreadedExecutor { - SingleThreadedExecutor::spawn_and_pin(guard, pinner, state, Some(name.into())) + SingleThreadedExecutor::spawn_and_pin(Arc::new(self.guard), self.pinner, state, Some(name.into())) } } From 9f2b9785e10c2702e8ec0e224acb554e88d1b66e Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Wed, 3 Jun 2026 12:19:49 +0530 Subject: [PATCH 4/4] fmt --- crates/core/src/host/module_host.rs | 3 +-- crates/core/src/util/jobs.rs | 10 ++++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index c6b432985b4..282af60eb61 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1718,8 +1718,7 @@ impl ModuleHost { let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); let main_state = WasmtimeModuleState::new(module.clone(), init_inst, metrics.clone()); - let executor = - core.spawn_executor(main_state, thread_name); + let executor = core.spawn_executor(main_state, thread_name); let procedure_instances = Arc::new(ModuleInstanceManager::new_bounded_with_metrics( procedure_module, None, diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index ca4fe26665b..9943c505d0c 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -220,11 +220,7 @@ pub struct AllocatedJobCore { impl AllocatedJobCore { /// Spawn a [`SingleThreadedExecutor`] for this allocated core. - pub fn spawn_executor( - self, - state: S, - name: impl Into, - ) -> SingleThreadedExecutor { + pub fn spawn_executor(self, state: S, name: impl Into) -> SingleThreadedExecutor { SingleThreadedExecutor::spawn_and_pin(Arc::new(self.guard), self.pinner, state, Some(name.into())) } } @@ -357,7 +353,9 @@ impl SingleThreadedExecutor { // dropped and cancelled. rt.block_on(local) }; - thread.spawn(worker).expect("failed to spawn SingleThreadedExecutor thread"); + thread + .spawn(worker) + .expect("failed to spawn SingleThreadedExecutor thread"); Self { inner } }