From da5abaaa8efb578fec279c6b61e7f97dd964d0ab Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Mon, 16 Feb 2026 20:29:45 +0000 Subject: [PATCH 1/2] improve perf of merge_apply_inserts --- .../locking_tx_datastore/committed_state.rs | 70 ++++++++++++------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 089b47e68f9..1a394ae844c 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -1184,7 +1184,7 @@ impl CommittedState { &mut self, tx_data: &mut TxData, insert_tables: BTreeMap, - tx_blob_store: impl BlobStore, + tx_bs: impl BlobStore, truncates: &mut IntSet, ) { // TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state, @@ -1196,40 +1196,62 @@ impl CommittedState { // and the fullness of the page. for (table_id, tx_table) in insert_tables { - // For each newly-inserted row, serialize to a product value. - let mut inserts = Vec::with_capacity(tx_table.row_count as usize); - inserts.extend(tx_table.scan_rows(&tx_blob_store).map(|row| row.to_product_value())); - - // For each newly-inserted row, serialize to a product value. - // This doesn't apply to event tables, - // which are only recorded in `TxData`, - // but never the committed state. let schema = tx_table.get_schema(); - if !schema.is_event { + let page_pool = &self.page_pool; + if schema.is_event { + // For event tables, we don't want to insert into the committed state, + // we just want to include them in subscriptions and the commitlog. + Self::collect_inserts(page_pool, truncates, tx_data, &tx_bs, table_id, tx_table, |_| {}); + } else { let (commit_table, commit_blob_store, page_pool) = - self.get_table_and_blob_store_or_create(table_id, tx_table.get_schema()); - for row in &inserts { + self.get_table_and_blob_store_or_create(table_id, schema); + Self::collect_inserts(page_pool, truncates, tx_data, &tx_bs, table_id, tx_table, |row| { commit_table .insert(page_pool, commit_blob_store, row) .expect("Failed to insert when merging commit"); - } + }); } + } + } - // Add the table to `TxData` if there were insertions. - if !inserts.is_empty() { - tx_data.set_inserts_for_table(table_id, &schema.table_name, inserts.into()); + /// Collects the inserted rows in `tx_table` into `tx_data`, + /// and applies `on_row` to each inserted row. + fn collect_inserts( + page_pool: &PagePool, + truncates: &mut IntSet, + tx_data: &mut TxData, + tx_blob_store: &impl BlobStore, + table_id: TableId, + tx_table: Table, + mut on_row: impl FnMut(&ProductValue), + ) { + // For each newly-inserted row, serialize to a product value. + // This bypasses the `Vec<_>` intermediary and constructs the `Arc<[_]>` directly, + // which matters somewhat for smaller transactions and more for larger transactions. + let mut inserts = Arc::new_uninit_slice(tx_table.row_count as usize); + let inserts_mut = Arc::get_mut(&mut inserts).expect("`Arc` should be unique as it was just created"); + for (row, slot) in tx_table.scan_rows(tx_blob_store).zip(inserts_mut) { + let row = row.to_product_value(); + on_row(&row); + slot.write(row); + } + // SAFETY: We've written to every slot in `inserts`, so it's now fully initialized. + let inserts = unsafe { inserts.assume_init() }; - // If table has inserted rows, it cannot be truncated. - if truncates.contains(&table_id) { - truncates.remove(&table_id); - } + // Add the table to `TxData` if there were insertions. + if !inserts.is_empty() { + tx_data.set_inserts_for_table(table_id, &tx_table.get_schema().table_name, inserts); + + // If table has inserted rows, it cannot be truncated. + if truncates.contains(&table_id) { + truncates.remove(&table_id); } + } - let (.., pages) = tx_table.consume_for_merge(); + let (.., pages) = tx_table.consume_for_merge(); - // Put all the pages in the table back into the pool. - self.page_pool.put_many(pages); - } + // Put all the pages in the table back into the pool. + page_pool.put_many(pages); } /// Rolls back the changes immediately made to the committed state during a transaction. From cfeee62063fb3c50d6a6fdd3a36171dd9ee0af84 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Mon, 16 Feb 2026 20:48:00 +0000 Subject: [PATCH 2/2] ReplicaContext: remove relational_db --- crates/core/src/client/client_connection.rs | 2 +- crates/core/src/client/message_handlers_v1.rs | 2 +- crates/core/src/client/message_handlers_v2.rs | 2 +- crates/core/src/host/host_controller.rs | 21 ++++++++----------- crates/core/src/host/instance_env.rs | 10 ++++----- crates/core/src/host/module_host.rs | 20 ++++++++++-------- .../src/host/wasm_common/module_host_actor.rs | 6 +++--- crates/core/src/replica_context.rs | 12 +++++++---- 8 files changed, 39 insertions(+), 36 deletions(-) diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 06ee274e104..6fb8d8e1623 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -139,7 +139,7 @@ impl DurableOffsetSupply for watch::Receiver { self.borrow() }; - Ok(module.replica_ctx().relational_db.durable_tx_offset()) + Ok(module.relational_db().durable_tx_offset()) } } diff --git a/crates/core/src/client/message_handlers_v1.rs b/crates/core/src/client/message_handlers_v1.rs index 742d9147553..d6cf8fc6257 100644 --- a/crates/core/src/client/message_handlers_v1.rs +++ b/crates/core/src/client/message_handlers_v1.rs @@ -40,7 +40,7 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst let mod_info = module.info(); let mod_metrics = &mod_info.metrics; let database_identity = mod_info.database_identity; - let db = &module.replica_ctx().relational_db; + let db = module.relational_db(); let record_metrics = |wl| { move |metrics| { if let Some(metrics) = metrics { diff --git a/crates/core/src/client/message_handlers_v2.rs b/crates/core/src/client/message_handlers_v2.rs index ddc1c037a9b..5dd2f80d01b 100644 --- a/crates/core/src/client/message_handlers_v2.rs +++ b/crates/core/src/client/message_handlers_v2.rs @@ -24,7 +24,7 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst let mod_info = module.info(); let mod_metrics = &mod_info.metrics; let database_identity = mod_info.database_identity; - let db = &module.replica_ctx().relational_db; + let db = module.relational_db(); let record_metrics = |wl| { move |metrics| { if let Some(metrics) = metrics { diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index d03edb4bd6e..7a28861abeb 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -375,7 +375,7 @@ impl HostController { on_panic(); }); - let db = module.replica_ctx().relational_db.clone(); + let db = module.relational_db().clone(); let result = module.on_module_thread_async("using_database", move || f(db)).await?; Ok(result) } @@ -541,9 +541,8 @@ impl HostController { info!("replica={replica_id} database={database_identity} exiting module"); module.exit().await; - let db = &module.replica_ctx().relational_db; info!("replica={replica_id} database={database_identity} exiting database"); - db.shutdown().await; + module.relational_db().shutdown().await; info!("replica={replica_id} database={database_identity} module host exited"); }) .await; @@ -672,8 +671,7 @@ async fn make_replica_ctx( send_worker_queue.clone(), ))); let downgraded = Arc::downgrade(&subscriptions); - let subscriptions = - ModuleSubscriptions::new(relational_db.clone(), subscriptions, send_worker_queue, bsatn_rlb_pool); + let subscriptions = ModuleSubscriptions::new(relational_db, subscriptions, send_worker_queue, bsatn_rlb_pool); // If an error occurs when evaluating a subscription, // we mark each client that was affected, @@ -694,7 +692,6 @@ async fn make_replica_ctx( replica_id, logger, subscriptions, - relational_db, }) } @@ -777,10 +774,10 @@ async fn launch_module( let db_identity = database.database_identity; let host_type = database.host_type; - let replica_ctx = make_replica_ctx(module_logs, database, replica_id, relational_db, bsatn_rlb_pool) + let replica_ctx = make_replica_ctx(module_logs, database, replica_id, relational_db.clone(), bsatn_rlb_pool) .await .map(Arc::new)?; - let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone()); + let (scheduler, scheduler_starter) = Scheduler::open(relational_db); let (program, module_host) = make_module_host( runtimes.clone(), host_type, @@ -998,7 +995,7 @@ impl Host { scheduler_starter.start(&module_host)?; let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle(); - let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db.clone()); + let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone()); let module = watch::Sender::new(module_host); @@ -1088,7 +1085,7 @@ impl Host { core: AllocatedJobCore, ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; - let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone()); + let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db().clone()); let (program, module) = make_module_host( runtimes, @@ -1106,7 +1103,7 @@ impl Host { let old_module_info = self.module.borrow().info.clone(); let update_result = - update_module(&replica_ctx.relational_db, &module, program, old_module_info, policy).await?; + update_module(replica_ctx.relational_db(), &module, program, old_module_info, policy).await?; // Only replace the module + scheduler if the update succeeded. // Otherwise, we want the database to continue running with the old state. @@ -1124,7 +1121,7 @@ impl Host { let old_watcher = std::mem::replace(&mut self.module, watch::Sender::new(module.clone())); // Disconnect all clients connected to the old module. - let connected_clients = replica_ctx.relational_db.connected_clients()?; + let connected_clients = replica_ctx.relational_db().connected_clients()?; for (identity, connection_id) in connected_clients { let client_actor_id = ClientActorId { identity, diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index a3f5bb9299c..3f19c5a30ef 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -273,7 +273,7 @@ impl InstanceEnv { } pub(crate) fn relational_db(&self) -> &Arc { - &self.replica_ctx.relational_db + self.replica_ctx.relational_db() } pub(crate) fn get_jwt_payload(&self, connection_id: ConnectionId) -> Result, NodesError> { @@ -703,9 +703,10 @@ impl InstanceEnv { )); } - let stdb = self.replica_ctx.relational_db.clone(); // TODO(procedure-tx): should we add a new workload, e.g., `AnonTx`? - let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + let tx = self + .relational_db() + .begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); self.tx.set_raw(tx); self.in_anon_tx = true; @@ -1300,7 +1301,7 @@ mod test { /// An `InstanceEnv` requires a `ReplicaContext`. /// For our purposes this is just a wrapper for `RelationalDB`. fn replica_ctx(relational_db: Arc) -> Result<(ReplicaContext, tokio::runtime::Runtime)> { - let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(relational_db.clone()); + let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(relational_db); let logger = { let _rt = runtime.enter(); Arc::new(temp_logger()) @@ -1317,7 +1318,6 @@ mod test { replica_id: 0, logger, subscriptions: subs, - relational_db, }, runtime, )) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 39552b1385b..7a4676388fc 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -461,7 +461,7 @@ fn init_database_inner( ) -> anyhow::Result<(Option, bool)> { log::debug!("init database"); let timestamp = Timestamp::now(); - let stdb = &*replica_ctx.relational_db; + let stdb = replica_ctx.relational_db(); let logger = replica_ctx.logger.system_logger(); let owner_identity = replica_ctx.database.owner_identity; let host_type = replica_ctx.host_type; @@ -2096,7 +2096,7 @@ impl ModuleHost { into_message: impl FnOnce(OneOffQueryResponseMessage) -> SerializableMessage + Send + 'static, ) -> Result<(), anyhow::Error> { let replica_ctx = self.replica_ctx(); - let db = replica_ctx.relational_db.clone(); + let db = self.relational_db().clone(); let subscriptions = replica_ctx.subscriptions.clone(); log::debug!("One-off query: {query}"); let metrics = self @@ -2200,8 +2200,7 @@ impl ModuleHost { if let Some(metrics) = metrics { // Record the metrics for the one-off query - replica_ctx - .relational_db + self.relational_db() .exec_counters_for(WorkloadType::Sql) .record(&metrics); } @@ -2224,7 +2223,7 @@ impl ModuleHost { rlb_pool: impl 'static + Send + RowListBuilderSource, ) -> Result<(), anyhow::Error> { let replica_ctx = self.replica_ctx(); - let db = replica_ctx.relational_db.clone(); + let db = self.relational_db().clone(); let subscriptions = replica_ctx.subscriptions.clone(); log::debug!("One-off query: {query}"); let metrics = self @@ -2234,8 +2233,7 @@ impl ModuleHost { .await??; if let Some(metrics) = metrics { - replica_ctx - .relational_db + self.relational_db() .exec_counters_for(WorkloadType::Sql) .record(&metrics); } @@ -2341,7 +2339,7 @@ impl ModuleHost { /// for tables without primary keys. It is only used in the benchmarks. /// Note: this doesn't drop the table, it just clears it! pub fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> { - let db = &*self.replica_ctx().relational_db; + let db = self.relational_db(); db.with_auto_commit(Workload::Internal, |tx| { let tables = db.get_all_tables_mut(tx)?; @@ -2371,13 +2369,17 @@ impl ModuleHost { } pub fn durable_tx_offset(&self) -> Option { - self.replica_ctx().relational_db.durable_tx_offset() + self.relational_db().durable_tx_offset() } pub fn database_logger(&self) -> &Arc { &self.replica_ctx().logger } + pub fn relational_db(&self) -> &Arc { + self.replica_ctx().relational_db() + } + pub(crate) fn replica_ctx(&self) -> &ReplicaContext { match &*self.inner { ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.replica_ctx(), diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index d265191b9b8..a7a0aa627d5 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -529,7 +529,7 @@ impl InstanceCommon { ) -> Result { let replica_ctx = inst.replica_ctx().clone(); let system_logger = replica_ctx.logger.system_logger(); - let stdb = &replica_ctx.relational_db; + let stdb = &replica_ctx.relational_db(); let plan: MigratePlan = match policy.try_migrate( self.info.database_identity, @@ -798,7 +798,7 @@ impl InstanceCommon { let caller_connection_id_opt = (caller_connection_id != ConnectionId::ZERO).then_some(caller_connection_id); let replica_ctx = inst.replica_ctx(); - let stdb = &*replica_ctx.relational_db.clone(); + let stdb = replica_ctx.relational_db(); let info = self.info.clone(); let reducer_def = info.module_def.reducer_by_id(reducer_id); let reducer_name = &reducer_def.name; @@ -1203,7 +1203,7 @@ impl InstanceCommon { }; let replica_ctx = inst.replica_ctx(); - let stdb = &*replica_ctx.relational_db.clone(); + let stdb = replica_ctx.relational_db(); let res = match sender { Some(sender) => stdb.materialize_view(&mut tx, table_id, sender, rows), None => stdb.materialize_anonymous_view(&mut tx, table_id, rows), diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 92d2019da0f..731288bb459 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -18,7 +18,6 @@ pub struct ReplicaContext { pub replica_id: u64, pub logger: Arc, pub subscriptions: ModuleSubscriptions, - pub relational_db: Arc, } impl ReplicaContext { @@ -26,7 +25,7 @@ impl ReplicaContext { /// /// An in-memory database will return `Ok(0)`. pub fn durability_size_on_disk(&self) -> io::Result { - self.relational_db.size_on_disk() + self.relational_db().size_on_disk() } /// The size of the log file. @@ -66,14 +65,19 @@ impl ReplicaContext { /// The size in bytes of all of the in-memory data of the database. pub fn mem_usage(&self) -> usize { - self.relational_db.size_in_memory() + self.relational_db().size_in_memory() } /// Update data size stats. pub fn update_gauges(&self) { - self.relational_db.update_data_size_metrics(); + self.relational_db().update_data_size_metrics(); self.subscriptions.update_gauges(); } + + /// Returns a reference to the relational database. + pub fn relational_db(&self) -> &Arc { + self.subscriptions.relational_db() + } } impl Deref for ReplicaContext {