diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 77533d952e4..9ce704426f7 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -28,7 +28,7 @@ use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRo use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; use crate::subscription::{execute_plan, execute_plan_for_view}; -use crate::util::jobs::{AllocatedJobCore, SingleCoreExecutor, SingleThreadedExecutor}; +use crate::util::jobs::{AllocatedJobCore, SingleThreadedExecutor}; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; use bytes::Bytes; @@ -342,8 +342,7 @@ pub enum ModuleWithInstance { Wasm { module: super::wasmtime::Module, procedure_module: super::wasmtime::ProcedureModule, - main_thread_name: String, - procedure_thread_name: String, + thread_name: String, core: AllocatedJobCore, init_inst: Box, procedure_instance_pool_size: NonZeroUsize, @@ -407,9 +406,8 @@ impl WasmtimeModuleState { } } -/// Wasm uses a single-core executor backed by a Tokio single threaded runtime -/// for async procedures. It uses an executor backed by a single OS-thread for -/// everything else. +/// Wasm uses a single executor backed by a single OS thread with a Tokio LocalSet +/// for async procedures; synchronous reducers run inline on the same thread. /// /// Note, procedures acquire a module instance from the async procedure pool /// before being enqueued by the executor. @@ -418,8 +416,7 @@ impl WasmtimeModuleState { /// to acquire. struct WasmtimeModuleHost { module: Arc, - main_executor: SingleThreadedExecutor, - procedure_executor: SingleCoreExecutor, + executor: SingleThreadedExecutor, procedure_instances: Arc, } @@ -435,7 +432,7 @@ impl WasmtimeModuleHost { A: Send + 'static, { let label = label.to_owned(); - self.main_executor.enqueue_job(move |state| { + self.executor.enqueue_sync_job(move |state| { scopeguard::defer_on_unwind!({ log::warn!("wasm main operation {label} panicked"); on_panic(); @@ -461,7 +458,7 @@ impl WasmtimeModuleHost { let instance_manager = self.procedure_instances.clone(); let ModuleInstanceLease { instance, slot } = instance_manager.get_instance().await; let label = label.to_owned(); - self.procedure_executor.enqueue_job(async move || { + self.executor.enqueue_async_job(async move || { scopeguard::defer_on_unwind!({ log::warn!("wasm procedure {label} panicked"); on_panic(); @@ -1709,8 +1706,7 @@ impl ModuleHost { ModuleWithInstance::Wasm { module, procedure_module, - main_thread_name, - procedure_thread_name, + thread_name, core, init_inst, procedure_instance_pool_size, @@ -1721,20 +1717,7 @@ impl ModuleHost { let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); let main_state = WasmtimeModuleState::new(module.clone(), init_inst, metrics.clone()); - // The wasm main and procedure executors run on separate OS threads, - // but they intentionally share one database core allocation. - // When core pinning is enabled, both threads pin to the same core - // and rebalance together because they use clones of the same `CorePinner`. - let (load_balance_guard, core_pinner) = core.into_shared(); - - let main_executor = AllocatedJobCore::spawn_executor( - load_balance_guard.clone(), - core_pinner.clone(), - main_state, - main_thread_name, - ); - let procedure_executor = - AllocatedJobCore::spawn_async_executor(load_balance_guard, core_pinner, procedure_thread_name); + let executor = core.spawn_executor(main_state, thread_name); let procedure_instances = Arc::new(ModuleInstanceManager::new_bounded_with_metrics( procedure_module, None, @@ -1743,8 +1726,7 @@ impl ModuleHost { )); Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost { module, - main_executor, - procedure_executor, + executor, procedure_instances, }))) } @@ -1853,9 +1835,9 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - let executor = host.main_executor.clone(); + let executor = host.executor.clone(); executor - .run_job(move |state| { + .run_sync_job(move |state| { state.with_instance(move |inst| { drop(timer_guard); wasm(arg, inst) @@ -1897,12 +1879,12 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - let executor = host.procedure_executor.clone(); + let executor = host.executor.clone(); let instance_manager = host.procedure_instances.clone(); instance_manager .with_instance(async move |mut inst| { executor - .run_job(async move || { + .run_async_job(async move || { drop(timer_guard); let res = wasm(arg, &mut inst).await; (res, inst) @@ -3202,10 +3184,10 @@ impl ModuleHost { request, |request, inst, on_panic| async move { inst.enqueue_one_off_query(request, on_panic).await }, move |request, wasm_host, on_panic, timer_guard| { - let executor = wasm_host.main_executor.clone(); + let executor = wasm_host.executor.clone(); let info = wasm_host.module.info(); let label = label.to_owned(); - executor.enqueue_job(move |_| { + executor.enqueue_sync_job(move |_| { scopeguard::defer_on_unwind!({ log::warn!("websocket one-off query operation {label} panicked"); on_panic(); diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index e8e4a3d4fc2..ea8bb4f6824 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -135,21 +135,14 @@ pub type ProcedureModule = WasmModuleHostActor; pub type ModuleInstance = WasmModuleInstance; // Linux thread names expose at most 15 bytes, so keep the database identity -// suffix short enough to survive after the `wasm-main-`/`wasm-proc-` prefix. -const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 5; +// suffix short enough to survive after the `wasm-` prefix. +const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 10; -fn wasm_main_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String { +fn wasm_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String { let hex = database_identity.to_hex(); // We use the tail of the identity to avoid the common structured prefix. let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..]; - format!("wasm-main-{suffix}") -} - -fn wasm_procedure_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String { - let hex = database_identity.to_hex(); - // We use the tail of the identity to avoid the common structured prefix. - let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..]; - format!("wasm-proc-{suffix}") + format!("wasm-{suffix}") } impl WasmtimeRuntime { @@ -182,16 +175,14 @@ impl WasmtimeRuntime { let module = WasmtimeModule::new(module); let procedure_module = WasmtimeAsyncModule::new(procedure_module); - let main_thread_name = wasm_main_worker_thread_name(&mcc.replica_ctx.database_identity); - let procedure_thread_name = wasm_procedure_executor_thread_name(&mcc.replica_ctx.database_identity); + let thread_name = wasm_worker_thread_name(&mcc.replica_ctx.database_identity); let (module, init_inst) = WasmModuleHostActor::new(mcc, module)?; let procedure_module = module.with_runtime_module(procedure_module)?; Ok(super::module_host::ModuleWithInstance::Wasm { module, procedure_module, - main_thread_name, - procedure_thread_name, + thread_name, core, init_inst: Box::new(init_inst), procedure_instance_pool_size: self.config.procedure_instance_pool_size, diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 89c18ee5242..9943c505d0c 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -1,5 +1,5 @@ use std::panic::AssertUnwindSafe; -use std::sync::{mpsc as std_mpsc, Arc, Mutex, Weak}; +use std::sync::{Arc, Mutex, Weak}; use core_affinity::CoreId; use futures::future::LocalBoxFuture; @@ -15,10 +15,10 @@ use crate::util::thread_scheduling::apply_compute_thread_hint; /// A handle to a pool of Tokio executors for running database WASM code on. /// -/// Each database has a [`SingleCoreExecutor`], -/// a handle to a single-threaded Tokio runtime which is pinned to a specific CPU core. -/// In multi-tenant environments, multiple databases' [`SingleCoreExecutor`]s may be handles on the same runtime/core, -/// and a [`SingleCoreExecutor`] may occasionally be migrated to a different runtime/core to balance load. +/// Each database has a [`SingleThreadedExecutor`], +/// a handle to a single OS thread with a Tokio LocalSet. +/// In multi-tenant environments, multiple databases' [`SingleThreadedExecutor`]s may be handles on the same runtime/core, +/// and a [`SingleThreadedExecutor`] may occasionally be migrated to a different runtime/core to balance load. /// /// Construct a `JobCores` via [`Self::from_pinned_cores`] or [`Self::without_pinned_cores`]. /// A `JobCores` constructed without core pinning, including `from_pinned_cores` on an empty set, @@ -39,29 +39,29 @@ enum JobCoresInner { } struct PinnedCoresExecutorManager { - /// Channels to request that a [`SingleCoreExecutor`] move to a different core. + /// Channels to request that a [`SingleThreadedExecutor`] move to a different core. /// /// The [`CoreId`] that an executor is pinned to is used as an index into /// `self.cores` to make load-balancing decisions when freeing a database /// executor in [`Self::deallocate`]. - database_executor_move: HashMap>, + database_executor_move: HashMap>, cores: IndexMap, /// An index into `cores` of the next core to put a new job onto. /// /// This acts as a partition point in `cores`; all cores in `..index` have /// one fewer job on them than the cores in `index..`. next_core: usize, - next_id: SingleCoreExecutorId, + next_id: SingleThreadedExecutorId, } -/// Remembers the [`SingleCoreExecutorId`]s for all databases sharing that executor. +/// Remembers the [`SingleThreadedExecutorId`]s for all databases sharing that executor. #[derive(Default)] struct CoreInfo { - jobs: SmallVec<[SingleCoreExecutorId; 4]>, + jobs: SmallVec<[SingleThreadedExecutorId; 4]>, } #[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -struct SingleCoreExecutorId(usize); +struct SingleThreadedExecutorId(usize); impl JobCores { /// Get an [`AllocatedCore`] for a job thread. @@ -93,7 +93,7 @@ impl JobCores { database_executor_move: HashMap::default(), cores, next_core: 0, - next_id: SingleCoreExecutorId(0), + next_id: SingleThreadedExecutorId(0), }))) }; @@ -117,7 +117,7 @@ impl PinnedCoresExecutorManager { /// and store state in `self` necessary to move that database to a new core /// for load-balancing purposes. /// - /// The returned [`SingleCoreExecutorId`] is an index into internal data structures in `self` (namely, `self.cores`) + /// The returned [`SingleThreadedExecutorId`] is an index into internal data structures in `self` (namely, `self.cores`) /// which should be passed to [`Self::deallocate`] when the database is no longer using this executor. /// This is done automatically by [`LoadBalanceOnDropGuard`]. /// @@ -126,7 +126,7 @@ impl PinnedCoresExecutorManager { /// replaced to balance databases among available cores, so databases should /// either spawn [`CorePinner::run`] as a thread-local async task, or call /// [`CorePinner::pin_now`] frequently. - fn allocate(&mut self) -> (SingleCoreExecutorId, CorePinner) { + fn allocate(&mut self) -> (SingleThreadedExecutorId, CorePinner) { // Determine the next job ID. let database_executor_id = self.next_id; self.next_id.0 += 1; @@ -156,8 +156,8 @@ impl PinnedCoresExecutorManager { /// Mark the executor at `id` as no longer in use, free internal state which tracks it, /// and move other executors to different cores as necessary to maintain a balanced distribution. /// - /// Called by [`LoadBalanceOnDropGuard`] when a [`SingleCoreExecutor`] is no longer in use. - fn deallocate(&mut self, id: SingleCoreExecutorId) { + /// Called by [`LoadBalanceOnDropGuard`] when a [`SingleThreadedExecutor`] is no longer in use. + fn deallocate(&mut self, id: SingleThreadedExecutorId) { // Determine the `CoreId` that will now have one less job. // The `id`s came from `self.allocate()`, // so there must be a `database_executor_move` for it. @@ -219,27 +219,9 @@ pub struct AllocatedJobCore { } impl AllocatedJobCore { - pub fn into_shared(self) -> (Arc, CorePinner) { - (Arc::new(self.guard), self.pinner) - } - - /// Spawn a [`SingleCoreExecutor`] allocated to this core. - pub fn spawn_async_executor( - guard: Arc, - pinner: CorePinner, - name: impl Into, - ) -> SingleCoreExecutor { - SingleCoreExecutor::spawn_and_pin(guard, pinner, Some(name.into())) - } - - /// Spawn a [`SingleThreadedExecutor`] allocated to this core. - pub fn spawn_executor( - guard: Arc, - pinner: CorePinner, - state: S, - name: impl Into, - ) -> SingleThreadedExecutor { - SingleThreadedExecutor::spawn_and_pin(guard, pinner, state, Some(name.into())) + /// Spawn a [`SingleThreadedExecutor`] for this allocated core. + pub fn spawn_executor(self, state: S, name: impl Into) -> SingleThreadedExecutor { + SingleThreadedExecutor::spawn_and_pin(Arc::new(self.guard), self.pinner, state, Some(name.into())) } } @@ -291,7 +273,12 @@ impl CorePinner { } } -/// A handle to a Tokio executor which can be used to run WASM compute for a particular database. +enum ExecutorJob { + Async(Box LocalBoxFuture<'static, ()> + Send>), + Sync(Box), +} + +/// A handle to a single-threaded executor for database work. /// /// Use [`Self::run_job`] to run futures, and [`Self::run_sync_job`] to run functions. /// @@ -299,27 +286,33 @@ impl CorePinner { /// When all handles on this database executor have been dropped, /// its use of the core to which it is pinned will be released, /// and other databases may be migrated to that core to balance load. -#[derive(Clone)] -pub struct SingleCoreExecutor { - inner: Arc, +pub struct SingleThreadedExecutor { + inner: Arc>, } -struct SingleCoreExecutorInner { - /// The sending end of a channel over which we send jobs. - job_tx: mpsc::UnboundedSender LocalBoxFuture<'static, ()> + Send>>, +impl Clone for SingleThreadedExecutor { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } } -impl SingleCoreExecutor { - /// Spawn a `SingleCoreExecutor` on the given core. - fn spawn(core: AllocatedJobCore, name: Option) -> Self { - let (guard, pinner) = core.into_shared(); - Self::spawn_and_pin(guard, pinner, name) - } +struct SingleThreadedExecutorInner { + /// The sending end of a channel over which we send jobs. + job_tx: mpsc::UnboundedSender>, +} - fn spawn_and_pin(guard: Arc, mut pinner: CorePinner, name: Option) -> Self { - let (job_tx, mut job_rx) = mpsc::unbounded_channel(); +impl SingleThreadedExecutor { + fn spawn_and_pin( + guard: Arc, + mut pinner: CorePinner, + mut state: S, + name: Option, + ) -> Self { + let (job_tx, mut job_rx) = mpsc::unbounded_channel::>(); - let inner = Arc::new(SingleCoreExecutorInner { job_tx }); + let inner = Arc::new(SingleThreadedExecutorInner { job_tx }); let rt = runtime::Handle::current(); let mut thread = std::thread::Builder::new(); @@ -333,9 +326,21 @@ impl SingleCoreExecutor { let _entered = rt.enter(); let local = tokio::task::LocalSet::new(); + // Clone the pinner: one for inline repinning in the sync job handler, + // one for the background re-pinning watcher. + let mut loop_pinner = pinner.clone(); + let job_loop = async { while let Some(job) = job_rx.recv().await { - local.spawn_local(job()); + match job { + ExecutorJob::Async(job) => { + local.spawn_local(job()); + } + ExecutorJob::Sync(job) => { + loop_pinner.pin_if_changed(); + job(&mut state); + } + } } }; @@ -348,23 +353,15 @@ impl SingleCoreExecutor { // dropped and cancelled. rt.block_on(local) }; - thread.spawn(worker).expect("failed to spawn SingleCoreExecutor thread"); + thread + .spawn(worker) + .expect("failed to spawn SingleThreadedExecutor thread"); Self { inner } } - /// Create a `SingleCoreExecutor` which runs jobs in [`tokio::runtime::Handle::current`]. - /// - /// Callers should most likely instead construct a `SingleCoreExecutor` via [`JobCores::take`], - /// which will intelligently pin each database to a particular core. - /// This method should only be used for short-lived instances which do not perform intense computation, - /// e.g. to extract the schema by calling `describe_module`. - pub fn in_current_tokio_runtime() -> Self { - Self::spawn(AllocatedJobCore::default(), None) - } - - /// Run a job for this database executor. - pub async fn run_job(&self, f: F) -> R + /// Run an async job on this executor and return its result. + pub async fn run_async_job(&self, f: F) -> R where F: AsyncFnOnce() -> R + Send + 'static, R: Send + 'static, @@ -374,15 +371,15 @@ impl SingleCoreExecutor { self.inner .job_tx - .send(Box::new(move || { + .send(ExecutorJob::Async(Box::new(move || { async move { let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; if let Err(Err(_panic)) = tx.send(result) { - tracing::warn!("uncaught panic on `SingleCoreExecutor`") + tracing::warn!("uncaught panic on `SingleThreadedExecutor`") } } .boxed_local() - })) + }))) .unwrap_or_else(|_| panic!("job thread exited")); match rx.await.unwrap() { @@ -392,7 +389,7 @@ impl SingleCoreExecutor { } /// Enqueue a job for this database executor without waiting for its result. - pub fn enqueue_job(&self, f: F) + pub fn enqueue_async_job(&self, f: F) where F: AsyncFnOnce() + Send + 'static, { @@ -400,83 +397,19 @@ impl SingleCoreExecutor { self.inner .job_tx - .send(Box::new(move || { + .send(ExecutorJob::Async(Box::new(move || { async move { if AssertUnwindSafe(f().instrument(span)).catch_unwind().await.is_err() { - tracing::warn!("uncaught panic on `SingleCoreExecutor`") + tracing::warn!("uncaught panic on `SingleThreadedExecutor`") } } .boxed_local() - })) + }))) .unwrap_or_else(|_| panic!("job thread exited")); } /// Run `f` on this database executor and return its result. pub async fn run_sync_job(&self, f: F) -> R - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - self.run_job(async || f()).await - } -} - -/// A handle to a plain OS-thread executor for synchronous database work. -/// -/// Unlike [`SingleCoreExecutor`], it is intended for synchronous runtimes. -/// This executor never enters Tokio and never polls futures on its worker -/// thread. -pub struct SingleThreadedExecutor { - inner: Arc>, -} - -struct SingleThreadedExecutorInner { - job_tx: std_mpsc::Sender>, -} - -type SyncJob = Box; - -impl Clone for SingleThreadedExecutor { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } -} - -impl SingleThreadedExecutor { - fn spawn_and_pin( - guard: Arc, - mut pinner: CorePinner, - mut state: S, - name: Option, - ) -> Self { - let (job_tx, job_rx) = std_mpsc::channel::>(); - - let inner = Arc::new(SingleThreadedExecutorInner { job_tx }); - - let mut thread = std::thread::Builder::new(); - if let Some(name) = name { - thread = thread.name(name); - } - let worker = move || { - let _guard = guard; - pinner.pin_now(); - - while let Ok(job) = job_rx.recv() { - pinner.pin_if_changed(); - job(&mut state); - } - }; - thread - .spawn(worker) - .expect("failed to spawn thread for `SingleThreadedExecutor`"); - - Self { inner } - } - - /// Run `f` on this database executor and return its result. - pub async fn run_job(&self, f: F) -> R where F: FnOnce(&mut S) -> R + Send + 'static, R: Send + 'static, @@ -486,7 +419,7 @@ impl SingleThreadedExecutor { self.inner .job_tx - .send(Box::new(move |state| { + .send(ExecutorJob::Sync(Box::new(move |state| { let result = std::panic::catch_unwind(AssertUnwindSafe(|| { let _entered = span.enter(); f(state) @@ -494,7 +427,7 @@ impl SingleThreadedExecutor { if let Err(Err(_panic)) = tx.send(result) { tracing::warn!("uncaught panic on `SingleThreadedExecutor`") } - })) + }))) .unwrap_or_else(|_| panic!("job thread exited")); match rx.await.unwrap() { @@ -503,8 +436,8 @@ impl SingleThreadedExecutor { } } - /// Enqueue `f` without waiting for its result. - pub fn enqueue_job(&self, f: F) + /// Enqueue a job for this database executor without waiting for its result. + pub fn enqueue_sync_job(&self, f: F) where F: FnOnce(&mut S) + Send + 'static, { @@ -512,7 +445,7 @@ impl SingleThreadedExecutor { self.inner .job_tx - .send(Box::new(move |state| { + .send(ExecutorJob::Sync(Box::new(move |state| { if std::panic::catch_unwind(AssertUnwindSafe(|| { let _entered = span.enter(); f(state); @@ -521,7 +454,7 @@ impl SingleThreadedExecutor { { tracing::warn!("uncaught panic on `SingleThreadedExecutor`") } - })) + }))) .unwrap_or_else(|_| panic!("job thread exited")); } } @@ -530,7 +463,7 @@ impl SingleThreadedExecutor { /// allowing databases from more-contended runtimes/cores to migrate there. #[derive(Default)] pub struct LoadBalanceOnDropGuard { - inner: Option<(Weak>, SingleCoreExecutorId)>, + inner: Option<(Weak>, SingleThreadedExecutorId)>, } impl Drop for LoadBalanceOnDropGuard {