Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl DurableOffsetSupply for watch::Receiver<ModuleHost> {
self.borrow()
};

Ok(module.replica_ctx().relational_db.durable_tx_offset())
Ok(module.relational_db().durable_tx_offset())
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/client/message_handlers_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
let mod_info = module.info();
let mod_metrics = &mod_info.metrics;
let database_identity = mod_info.database_identity;
let db = &module.replica_ctx().relational_db;
let db = module.relational_db();
let record_metrics = |wl| {
move |metrics| {
if let Some(metrics) = metrics {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/client/message_handlers_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
let mod_info = module.info();
let mod_metrics = &mod_info.metrics;
let database_identity = mod_info.database_identity;
let db = &module.replica_ctx().relational_db;
let db = module.relational_db();
let record_metrics = |wl| {
move |metrics| {
if let Some(metrics) = metrics {
Expand Down
21 changes: 9 additions & 12 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl HostController {
on_panic();
});

let db = module.replica_ctx().relational_db.clone();
let db = module.relational_db().clone();
let result = module.on_module_thread_async("using_database", move || f(db)).await?;
Ok(result)
}
Expand Down Expand Up @@ -541,9 +541,8 @@ impl HostController {

info!("replica={replica_id} database={database_identity} exiting module");
module.exit().await;
let db = &module.replica_ctx().relational_db;
info!("replica={replica_id} database={database_identity} exiting database");
db.shutdown().await;
module.relational_db().shutdown().await;
info!("replica={replica_id} database={database_identity} module host exited");
})
.await;
Expand Down Expand Up @@ -672,8 +671,7 @@ async fn make_replica_ctx(
send_worker_queue.clone(),
)));
let downgraded = Arc::downgrade(&subscriptions);
let subscriptions =
ModuleSubscriptions::new(relational_db.clone(), subscriptions, send_worker_queue, bsatn_rlb_pool);
let subscriptions = ModuleSubscriptions::new(relational_db, subscriptions, send_worker_queue, bsatn_rlb_pool);

// If an error occurs when evaluating a subscription,
// we mark each client that was affected,
Expand All @@ -694,7 +692,6 @@ async fn make_replica_ctx(
replica_id,
logger,
subscriptions,
relational_db,
})
}

Expand Down Expand Up @@ -777,10 +774,10 @@ async fn launch_module(
let db_identity = database.database_identity;
let host_type = database.host_type;

let replica_ctx = make_replica_ctx(module_logs, database, replica_id, relational_db, bsatn_rlb_pool)
let replica_ctx = make_replica_ctx(module_logs, database, replica_id, relational_db.clone(), bsatn_rlb_pool)
.await
.map(Arc::new)?;
let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone());
let (scheduler, scheduler_starter) = Scheduler::open(relational_db);
let (program, module_host) = make_module_host(
runtimes.clone(),
host_type,
Expand Down Expand Up @@ -998,7 +995,7 @@ impl Host {

scheduler_starter.start(&module_host)?;
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db.clone());
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone());

let module = watch::Sender::new(module_host);

Expand Down Expand Up @@ -1088,7 +1085,7 @@ impl Host {
core: AllocatedJobCore,
) -> anyhow::Result<UpdateDatabaseResult> {
let replica_ctx = &self.replica_ctx;
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db().clone());

let (program, module) = make_module_host(
runtimes,
Expand All @@ -1106,7 +1103,7 @@ impl Host {
let old_module_info = self.module.borrow().info.clone();

let update_result =
update_module(&replica_ctx.relational_db, &module, program, old_module_info, policy).await?;
update_module(replica_ctx.relational_db(), &module, program, old_module_info, policy).await?;

// Only replace the module + scheduler if the update succeeded.
// Otherwise, we want the database to continue running with the old state.
Expand All @@ -1124,7 +1121,7 @@ impl Host {
let old_watcher = std::mem::replace(&mut self.module, watch::Sender::new(module.clone()));

// Disconnect all clients connected to the old module.
let connected_clients = replica_ctx.relational_db.connected_clients()?;
let connected_clients = replica_ctx.relational_db().connected_clients()?;
for (identity, connection_id) in connected_clients {
let client_actor_id = ClientActorId {
identity,
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl InstanceEnv {
}

pub(crate) fn relational_db(&self) -> &Arc<RelationalDB> {
&self.replica_ctx.relational_db
self.replica_ctx.relational_db()
}

pub(crate) fn get_jwt_payload(&self, connection_id: ConnectionId) -> Result<Option<String>, NodesError> {
Expand Down Expand Up @@ -703,9 +703,10 @@ impl InstanceEnv {
));
}

let stdb = self.replica_ctx.relational_db.clone();
// TODO(procedure-tx): should we add a new workload, e.g., `AnonTx`?
let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
let tx = self
.relational_db()
.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
self.tx.set_raw(tx);
self.in_anon_tx = true;

Expand Down Expand Up @@ -1300,7 +1301,7 @@ mod test {
/// An `InstanceEnv` requires a `ReplicaContext`.
/// For our purposes this is just a wrapper for `RelationalDB`.
fn replica_ctx(relational_db: Arc<RelationalDB>) -> Result<(ReplicaContext, tokio::runtime::Runtime)> {
let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(relational_db.clone());
let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(relational_db);
let logger = {
let _rt = runtime.enter();
Arc::new(temp_logger())
Expand All @@ -1317,7 +1318,6 @@ mod test {
replica_id: 0,
logger,
subscriptions: subs,
relational_db,
},
runtime,
))
Expand Down
20 changes: 11 additions & 9 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ fn init_database_inner(
) -> anyhow::Result<(Option<ReducerCallResult>, bool)> {
log::debug!("init database");
let timestamp = Timestamp::now();
let stdb = &*replica_ctx.relational_db;
let stdb = replica_ctx.relational_db();
let logger = replica_ctx.logger.system_logger();
let owner_identity = replica_ctx.database.owner_identity;
let host_type = replica_ctx.host_type;
Expand Down Expand Up @@ -2096,7 +2096,7 @@ impl ModuleHost {
into_message: impl FnOnce(OneOffQueryResponseMessage<F>) -> SerializableMessage + Send + 'static,
) -> Result<(), anyhow::Error> {
let replica_ctx = self.replica_ctx();
let db = replica_ctx.relational_db.clone();
let db = self.relational_db().clone();
let subscriptions = replica_ctx.subscriptions.clone();
log::debug!("One-off query: {query}");
let metrics = self
Expand Down Expand Up @@ -2200,8 +2200,7 @@ impl ModuleHost {

if let Some(metrics) = metrics {
// Record the metrics for the one-off query
replica_ctx
.relational_db
self.relational_db()
.exec_counters_for(WorkloadType::Sql)
.record(&metrics);
}
Expand All @@ -2224,7 +2223,7 @@ impl ModuleHost {
rlb_pool: impl 'static + Send + RowListBuilderSource<ws_v1::BsatnFormat>,
) -> Result<(), anyhow::Error> {
let replica_ctx = self.replica_ctx();
let db = replica_ctx.relational_db.clone();
let db = self.relational_db().clone();
let subscriptions = replica_ctx.subscriptions.clone();
log::debug!("One-off query: {query}");
let metrics = self
Expand All @@ -2234,8 +2233,7 @@ impl ModuleHost {
.await??;

if let Some(metrics) = metrics {
replica_ctx
.relational_db
self.relational_db()
.exec_counters_for(WorkloadType::Sql)
.record(&metrics);
}
Expand Down Expand Up @@ -2341,7 +2339,7 @@ impl ModuleHost {
/// for tables without primary keys. It is only used in the benchmarks.
/// Note: this doesn't drop the table, it just clears it!
pub fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> {
let db = &*self.replica_ctx().relational_db;
let db = self.relational_db();

db.with_auto_commit(Workload::Internal, |tx| {
let tables = db.get_all_tables_mut(tx)?;
Expand Down Expand Up @@ -2371,13 +2369,17 @@ impl ModuleHost {
}

pub fn durable_tx_offset(&self) -> Option<DurableOffset> {
self.replica_ctx().relational_db.durable_tx_offset()
self.relational_db().durable_tx_offset()
}

pub fn database_logger(&self) -> &Arc<DatabaseLogger> {
&self.replica_ctx().logger
}

pub fn relational_db(&self) -> &Arc<RelationalDB> {
self.replica_ctx().relational_db()
}

pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
match &*self.inner {
ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.replica_ctx(),
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ impl InstanceCommon {
) -> Result<UpdateDatabaseResult, anyhow::Error> {
let replica_ctx = inst.replica_ctx().clone();
let system_logger = replica_ctx.logger.system_logger();
let stdb = &replica_ctx.relational_db;
let stdb = &replica_ctx.relational_db();

let plan: MigratePlan = match policy.try_migrate(
self.info.database_identity,
Expand Down Expand Up @@ -798,7 +798,7 @@ impl InstanceCommon {
let caller_connection_id_opt = (caller_connection_id != ConnectionId::ZERO).then_some(caller_connection_id);

let replica_ctx = inst.replica_ctx();
let stdb = &*replica_ctx.relational_db.clone();
let stdb = replica_ctx.relational_db();
let info = self.info.clone();
let reducer_def = info.module_def.reducer_by_id(reducer_id);
let reducer_name = &reducer_def.name;
Expand Down Expand Up @@ -1203,7 +1203,7 @@ impl InstanceCommon {
};

let replica_ctx = inst.replica_ctx();
let stdb = &*replica_ctx.relational_db.clone();
let stdb = replica_ctx.relational_db();
let res = match sender {
Some(sender) => stdb.materialize_view(&mut tx, table_id, sender, rows),
None => stdb.materialize_anonymous_view(&mut tx, table_id, rows),
Expand Down
12 changes: 8 additions & 4 deletions crates/core/src/replica_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ pub struct ReplicaContext {
pub replica_id: u64,
pub logger: Arc<DatabaseLogger>,
pub subscriptions: ModuleSubscriptions,
pub relational_db: Arc<RelationalDB>,
}

impl ReplicaContext {
/// The number of bytes on disk occupied by the database's durability layer.
///
/// An in-memory database will return `Ok(0)`.
pub fn durability_size_on_disk(&self) -> io::Result<SizeOnDisk> {
self.relational_db.size_on_disk()
self.relational_db().size_on_disk()
}

/// The size of the log file.
Expand Down Expand Up @@ -66,14 +65,19 @@ impl ReplicaContext {

/// The size in bytes of all of the in-memory data of the database.
pub fn mem_usage(&self) -> usize {
self.relational_db.size_in_memory()
self.relational_db().size_in_memory()
}

/// Update data size stats.
pub fn update_gauges(&self) {
self.relational_db.update_data_size_metrics();
self.relational_db().update_data_size_metrics();
self.subscriptions.update_gauges();
}

/// Returns a reference to the relational database.
pub fn relational_db(&self) -> &Arc<RelationalDB> {
self.subscriptions.relational_db()
}
}

impl Deref for ReplicaContext {
Expand Down
70 changes: 46 additions & 24 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ impl CommittedState {
&mut self,
tx_data: &mut TxData,
insert_tables: BTreeMap<TableId, Table>,
tx_blob_store: impl BlobStore,
tx_bs: impl BlobStore,
truncates: &mut IntSet<TableId>,
) {
// TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state,
Expand All @@ -1196,40 +1196,62 @@ impl CommittedState {
// and the fullness of the page.

for (table_id, tx_table) in insert_tables {
// For each newly-inserted row, serialize to a product value.
let mut inserts = Vec::with_capacity(tx_table.row_count as usize);
inserts.extend(tx_table.scan_rows(&tx_blob_store).map(|row| row.to_product_value()));

// For each newly-inserted row, serialize to a product value.
// This doesn't apply to event tables,
// which are only recorded in `TxData`,
// but never the committed state.
let schema = tx_table.get_schema();
if !schema.is_event {
let page_pool = &self.page_pool;
if schema.is_event {
// For event tables, we don't want to insert into the committed state,
// we just want to include them in subscriptions and the commitlog.
Self::collect_inserts(page_pool, truncates, tx_data, &tx_bs, table_id, tx_table, |_| {});
} else {
let (commit_table, commit_blob_store, page_pool) =
self.get_table_and_blob_store_or_create(table_id, tx_table.get_schema());
for row in &inserts {
self.get_table_and_blob_store_or_create(table_id, schema);
Self::collect_inserts(page_pool, truncates, tx_data, &tx_bs, table_id, tx_table, |row| {
commit_table
.insert(page_pool, commit_blob_store, row)
.expect("Failed to insert when merging commit");
}
});
}
}
}

// Add the table to `TxData` if there were insertions.
if !inserts.is_empty() {
tx_data.set_inserts_for_table(table_id, &schema.table_name, inserts.into());
/// Collects the inserted rows in `tx_table` into `tx_data`,
/// and applies `on_row` to each inserted row.
fn collect_inserts(
page_pool: &PagePool,
truncates: &mut IntSet<TableId>,
tx_data: &mut TxData,
tx_blob_store: &impl BlobStore,
table_id: TableId,
tx_table: Table,
mut on_row: impl FnMut(&ProductValue),
) {
// For each newly-inserted row, serialize to a product value.
// This bypasses the `Vec<_>` intermediary and constructs the `Arc<[_]>` directly,
// which matters somewhat for smaller transactions and more for larger transactions.
let mut inserts = Arc::new_uninit_slice(tx_table.row_count as usize);
let inserts_mut = Arc::get_mut(&mut inserts).expect("`Arc` should be unique as it was just created");
for (row, slot) in tx_table.scan_rows(tx_blob_store).zip(inserts_mut) {
let row = row.to_product_value();
on_row(&row);
slot.write(row);
}
// SAFETY: We've written to every slot in `inserts`, so it's now fully initialized.
let inserts = unsafe { inserts.assume_init() };

// If table has inserted rows, it cannot be truncated.
if truncates.contains(&table_id) {
truncates.remove(&table_id);
}
// Add the table to `TxData` if there were insertions.
if !inserts.is_empty() {
tx_data.set_inserts_for_table(table_id, &tx_table.get_schema().table_name, inserts);

// If table has inserted rows, it cannot be truncated.
if truncates.contains(&table_id) {
truncates.remove(&table_id);
}
}

let (.., pages) = tx_table.consume_for_merge();
let (.., pages) = tx_table.consume_for_merge();

// Put all the pages in the table back into the pool.
self.page_pool.put_many(pages);
}
// Put all the pages in the table back into the pool.
page_pool.put_many(pages);
}

/// Rolls back the changes immediately made to the committed state during a transaction.
Expand Down
Loading