Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions crates/bench/benches/index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use core::{any::type_name, hash::BuildHasherDefault, hint::black_box, iter::repeat_with, mem, time::Duration};
use core::{
any::type_name, hash::BuildHasherDefault, hash::Hash, hint::black_box, iter::repeat_with, mem, time::Duration,
};
use criterion::{
criterion_group, criterion_main,
measurement::{Measurement as _, WallTime},
Expand All @@ -14,13 +16,9 @@ use rand::{
use spacetimedb_lib::AlgebraicValue;
use spacetimedb_sats::{layout::Size, product, u256};
use spacetimedb_table::indexes::{PageIndex, PageOffset, RowPointer, SquashedOffset};
use spacetimedb_table::table_index::uniquemap::UniqueMap;
use spacetimedb_table::table_index::Index as _;
use spacetimedb_table::table_index::{
unique_direct_index::{ToFromUsize, UniqueDirectIndex},
KeySize,
};
use std::hash::Hash;
use spacetimedb_table::table_index::unique_btree_index::UniqueBTreeIndex;
use spacetimedb_table::table_index::unique_direct_index::{ToFromUsize, UniqueDirectIndex};
use spacetimedb_table::table_index::{Index as _, KeySize};

fn time<R>(body: impl FnOnce() -> R) -> Duration {
let start = WallTime.start();
Expand Down Expand Up @@ -201,7 +199,7 @@ trait Index: Clone {
}

#[derive(Clone)]
struct IBTree<K: KeySize<MemoStorage: Clone + Default>>(UniqueMap<K>);
struct IBTree<K: KeySize<MemoStorage: Clone + Default>>(UniqueBTreeIndex<K>);
impl<K: KeySize<MemoStorage: Clone + Default> + Clone + Eq + Hash + Ord> Index for IBTree<K> {
type K = K;
fn new() -> Self {
Expand Down
16 changes: 9 additions & 7 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ impl MetadataFile {
/// `self` is the metadata file read from a database, and current is
/// the default metadata file that the active database version would
/// right to a new database.
pub fn check_compatibility_and_update(mut self, current: Self) -> anyhow::Result<Self> {
pub fn check_compatibility_and_update(mut self, current: Self, path: &Path) -> anyhow::Result<Self> {
anyhow::ensure!(
self.edition == current.edition,
"metadata.toml indicates that this database is from a different \
"metadata.toml at {} indicates that this database is from a different \
edition of SpacetimeDB (running {:?}, but this database is {:?})",
path.display(),
current.edition,
self.edition,
);
Expand All @@ -78,9 +79,10 @@ impl MetadataFile {
};
anyhow::ensure!(
cmp.matches(&current.version),
"metadata.toml indicates that this database is from a newer, \
"metadata.toml at {} indicates that this database is from a newer, \
incompatible version of SpacetimeDB (running {:?}, but this \
database is from {:?})",
path.display(),
current.version,
self.version,
);
Expand Down Expand Up @@ -165,24 +167,24 @@ mod tests {
fn check_metadata_compatibility_checking() {
assert_eq!(
mkmeta(1, 0, 0)
.check_compatibility_and_update(mkmeta(1, 0, 1))
.check_compatibility_and_update(mkmeta(1, 0, 1), Path::new("metadata.toml"))
.unwrap()
.version,
mkver(1, 0, 1)
);
assert_eq!(
mkmeta(1, 0, 1)
.check_compatibility_and_update(mkmeta(1, 0, 0))
.check_compatibility_and_update(mkmeta(1, 0, 0), Path::new("metadata.toml"))
.unwrap()
.version,
mkver(1, 0, 1)
);

mkmeta(1, 1, 0)
.check_compatibility_and_update(mkmeta(1, 0, 5))
.check_compatibility_and_update(mkmeta(1, 0, 5), Path::new("metadata.toml"))
.unwrap_err();
mkmeta(2, 0, 0)
.check_compatibility_and_update(mkmeta(1, 3, 5))
.check_compatibility_and_update(mkmeta(1, 3, 5), Path::new("metadata.toml"))
.unwrap_err();
}
}
29 changes: 11 additions & 18 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
use spacetimedb_datastore::system_tables::{
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
};
Expand Down Expand Up @@ -57,11 +57,12 @@ use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotReposit
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::page_pool::PagePool;
use spacetimedb_table::table::{RowRef, TableScanIter};
use spacetimedb_table::table_index::IndexKey;
use spacetimedb_vm::errors::{ErrorType, ErrorVm};
use spacetimedb_vm::ops::parse;
use std::borrow::Cow;
use std::io;
use std::ops::{Bound, RangeBounds};
use std::ops::RangeBounds;
use std::sync::Arc;
use tokio::sync::watch;

Expand Down Expand Up @@ -1391,32 +1392,24 @@ impl RelationalDB {
Ok(self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range)?)
}

pub fn index_scan_range<'a>(
pub fn index_scan_range<'de, 'a>(
&'a self,
tx: &'a MutTx,
index_id: IndexId,
prefix: &[u8],
prefix: &'de [u8],
prefix_elems: ColId,
rstart: &[u8],
rend: &[u8],
) -> Result<
(
TableId,
Bound<AlgebraicValue>,
Bound<AlgebraicValue>,
impl Iterator<Item = RowRef<'a>>,
),
DBError,
> {
rstart: &'de [u8],
rend: &'de [u8],
) -> Result<(TableId, IndexScanPointOrRange<'de, 'a>), DBError> {
Ok(tx.index_scan_range(index_id, prefix, prefix_elems, rstart, rend)?)
}

pub fn index_scan_point<'a>(
pub fn index_scan_point<'a, 'p>(
&'a self,
tx: &'a MutTx,
index_id: IndexId,
point: &[u8],
) -> Result<(TableId, AlgebraicValue, impl Iterator<Item = RowRef<'a>>), DBError> {
point: &'p [u8],
) -> Result<(TableId, IndexKey<'p>, impl Iterator<Item = RowRef<'a>>), DBError> {
Ok(tx.index_scan_point(index_id, point)?)
}

Expand Down
18 changes: 12 additions & 6 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId};
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, IndexScanPointOrRange, MutTxId};
use spacetimedb_datastore::traits::IsolationLevel;
use spacetimedb_lib::{http as st_http, ConnectionId, Identity, Timestamp};
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
Expand Down Expand Up @@ -484,9 +484,12 @@ impl InstanceEnv {
let tx = &mut *self.get_tx()?;

// Find all rows in the table to delete.
let (table_id, _, _, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
let (table_id, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
// Re. `SmallVec`, `delete_by_field` only cares about 1 element, so optimize for that.
let rows_to_delete = iter.map(|row_ref| row_ref.pointer()).collect::<SmallVec<[_; 1]>>();
let rows_to_delete = match iter {
IndexScanPointOrRange::Point(_, iter) => iter.map(|row_ref| row_ref.pointer()).collect(),
IndexScanPointOrRange::Range(iter) => iter.map(|row_ref| row_ref.pointer()).collect(),
};

Ok(Self::datastore_delete_by_index_scan(stdb, tx, table_id, rows_to_delete))
}
Expand Down Expand Up @@ -648,19 +651,22 @@ impl InstanceEnv {
let tx = &mut *self.get_tx()?;

// Open index iterator
let (table_id, lower, upper, iter) =
let (table_id, iter) =
self.relational_db()
.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;

// Scan the index and serialize rows to BSATN.
let (chunks, rows_scanned, bytes_scanned) = ChunkedWriter::collect_iter(pool, iter);
let (point, (chunks, rows_scanned, bytes_scanned)) = match iter {
IndexScanPointOrRange::Point(point, iter) => (Some(point), ChunkedWriter::collect_iter(pool, iter)),
IndexScanPointOrRange::Range(iter) => (None, ChunkedWriter::collect_iter(pool, iter)),
};

// Record the number of rows and the number of bytes scanned by the iterator.
tx.metrics.index_seeks += 1;
tx.metrics.bytes_scanned += bytes_scanned;
tx.metrics.rows_scanned += rows_scanned;

tx.record_index_scan_range(&self.func_type, table_id, index_id, lower, upper);
tx.record_index_scan_range(&self.func_type, table_id, index_id, point);

Ok(chunks)
}
Expand Down
63 changes: 14 additions & 49 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
execution_context::ExecutionContext,
locking_tx_datastore::{
mut_tx::ViewReadSets,
state_view::{iter_st_column_for_table, ApplyFilter, EqOnColumn, RangeOnColumn, ScanOrIndex},
state_view::{iter_st_column_for_table, ScanOrIndex},
IterByColRangeTx,
},
system_tables::{
Expand Down Expand Up @@ -50,8 +50,7 @@ use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
page_pool::PagePool,
table::{IndexScanPointIter, IndexScanRangeIter, InsertError, RowRef, Table, TableAndIndex, TableScanIter},
table_index::IndexSeekRangeResult,
table::{InsertError, RowRef, Table, TableAndIndex, TableScanIter},
};
use std::collections::BTreeMap;
use std::sync::Arc;
Expand Down Expand Up @@ -219,12 +218,12 @@ impl StateView for CommittedState {
cols: ColList,
range: R,
) -> Result<Self::IterByColRange<'_, R>> {
match self.index_seek_range(table_id, &cols, &range) {
let iter = self
.get_index_by_cols(table_id, &cols)
.map(|i| i.seek_range_via_algebraic_value(&range));
match iter {
Some(Ok(iter)) => Ok(ScanOrIndex::Index(iter)),
None | Some(Err(_)) => Ok(ScanOrIndex::Scan(ApplyFilter::new(
RangeOnColumn { cols, range },
self.iter(table_id)?,
))),
None | Some(Err(_)) => Ok(ScanOrIndex::scan_range(cols, range, self.iter(table_id)?)),
}
}

Expand All @@ -235,12 +234,12 @@ impl StateView for CommittedState {
val: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
let cols = cols.into();
match self.index_seek_point(table_id, &cols, val) {
let iter = self
.get_index_by_cols(table_id, &cols)
.map(|i| i.seek_point_via_algebraic_value(val));
match iter {
Some(iter) => Ok(ScanOrIndex::Index(iter)),
None => Ok(ScanOrIndex::Scan(ApplyFilter::new(
EqOnColumn { cols, val },
self.iter(table_id)?,
))),
None => Ok(ScanOrIndex::scan_eq(cols, val, self.iter(table_id)?)),
}
}

Expand Down Expand Up @@ -949,45 +948,11 @@ impl CommittedState {
Some(self.get_table(table_id)?.scan_rows(&self.blob_store))
}

/// When there's an index on `cols`,
/// returns an iterator over the [TableIndex] that yields all the [`RowRef`]s
/// that match the specified `range` in the indexed column.
///
/// Matching is defined by `Ord for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowRef`
/// when `range` is a point.
/// When there is no index this returns `None`.
pub(super) fn index_seek_range<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<IndexSeekRangeResult<IndexScanRangeIter<'a>>> {
self.tables
.get(&table_id)?
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek_range(range))
}

/// When there's an index on `cols`,
/// returns an iterator over the [TableIndex] that yields all the [`RowRef`]s
/// that equal `value` in the indexed column.
///
/// Matching is defined by `Eq for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowRef`.
/// When there is no index this returns `None`.
pub(super) fn index_seek_point<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
value: &AlgebraicValue,
) -> Option<IndexScanPointIter<'a>> {
/// Returns an index for `table_id` on `cols`, if any.
pub(super) fn get_index_by_cols(&self, table_id: TableId, cols: &ColList) -> Option<TableAndIndex<'_>> {
self.tables
.get(&table_id)?
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek_point(value))
}

/// Returns the table associated with the given `index_id`, if any.
Expand Down
2 changes: 1 addition & 1 deletion crates/datastore/src/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
pub mod committed_state;
pub mod datastore;
mod mut_tx;
pub use mut_tx::{FuncCallType, MutTxId, ViewCallInfo};
pub use mut_tx::{FuncCallType, IndexScanPointOrRange, MutTxId, ViewCallInfo};
mod sequence;
pub mod state_view;
pub use state_view::{IterByColEqTx, IterByColRangeTx};
Expand Down
Loading