Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ jwks = { package = "spacetimedb-jwks", version = "0.1.3" }
lazy_static = "1.4.0"
lean_string = "0.5.1"
log = "0.4.17"
libc = "0.2.182"
memchr = "2"
mimalloc = "0.1.39"
names = "0.14"
Expand Down Expand Up @@ -382,6 +383,7 @@ features = [
"ondemand",
]


[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

Expand Down
16 changes: 10 additions & 6 deletions crates/commitlog/src/repo/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use log::{debug, warn};
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
use spacetimedb_fs_utils::direct_io;
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
use tempfile::NamedTempFile;

Expand All @@ -29,7 +30,7 @@ pub type OnNewSegmentFn = dyn Fn() + Send + Sync + 'static;
/// Size on disk of a [Fs] repo.
///
/// Created by [Fs::size_on_disk].
#[derive(Clone, Copy, Default)]
#[derive(Clone, Copy, Debug, Default)]
pub struct SizeOnDisk {
/// The total size in bytes of all segments and offset indexes in the repo.
pub total_bytes: u64,
Expand Down Expand Up @@ -156,7 +157,7 @@ impl FileLike for NamedTempFile {

impl Repo for Fs {
type SegmentWriter = File;
type SegmentReader = CompressReader;
type SegmentReader = CompressReader<File>;

fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
File::options()
Expand Down Expand Up @@ -213,18 +214,21 @@ impl Repo for Fs {
}

fn compress_segment(&self, offset: u64) -> io::Result<()> {
let src = self.open_segment_reader(offset)?;
let segment_path = self.segment_path(offset);
let src = direct_io::file_reader(&segment_path).and_then(CompressReader::new)?;
// if it's already compressed, leave it be
let CompressReader::None(mut src) = src else {
return Ok(());
};

let mut dst = NamedTempFile::new_in(&self.root)?;
let tmp = NamedTempFile::new_in(&self.root)?.into_temp_path();
let mut dst = direct_io::file_writer(&tmp)?;
// bytes per frame. in the future, it might be worth looking into putting
// every commit into its own frame, to make seeking more efficient.
let max_frame_size = 0x1000;
compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
dst.persist(self.segment_path(offset))?;
dst.get_ref().sync_all()?;
tmp.persist(segment_path)?;

Ok(())
}
Expand Down Expand Up @@ -266,7 +270,7 @@ impl Repo for Fs {
}
}

impl SegmentLen for CompressReader {}
impl<R: io::Read + io::Seek> SegmentLen for CompressReader<R> {}

#[cfg(feature = "streaming")]
impl crate::stream::AsyncRepo for Fs {
Expand Down
11 changes: 8 additions & 3 deletions crates/commitlog/tests/random_payload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ fn compression() {
let clog = Commitlog::open(
CommitLogDir::from_path_unchecked(root.path()),
Options {
max_segment_size: 8 * 1024,
max_segment_size: 16 * 1024,
max_records_in_commit: NonZeroU16::MIN,
..Options::default()
},
Expand All @@ -97,7 +97,7 @@ fn compression() {

// try to generate commitlogs that will be amenable to compression -
// random data doesn't compress well, so try and have there be repetition
let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::<Vec<_>>();
let payloads = (0..4).map(|_| gen_payload()).cycle().take(1500).collect::<Vec<_>>();
for payload in &payloads {
clog.append_maybe_flush(*payload).unwrap();
}
Expand All @@ -111,7 +111,12 @@ fn compression() {
clog.compress_segments(segments_to_compress).unwrap();

let compressed_size = clog.size_on_disk().unwrap();
assert!(compressed_size.total_bytes < uncompressed_size.total_bytes);
assert!(
compressed_size.total_bytes < uncompressed_size.total_bytes,
"expected total size to be smaller after compression: uncompressed={:?} compressed={:?}",
uncompressed_size,
compressed_size
);

assert!(clog
.transactions(&payload::ArrayDecoder)
Expand Down
6 changes: 6 additions & 0 deletions crates/fs-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ thiserror.workspace = true
tokio.workspace = true
zstd-framed.workspace = true

[target.'cfg(unix)'.dependencies]
libc.workspace = true

[target.'cfg(windows)'.dependencies]
windows-sys.workspace = true

[dev-dependencies]
tempdir.workspace = true

Expand Down
34 changes: 18 additions & 16 deletions crates/fs-utils/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ impl CompressType {
}

/// A reader that can read compressed files
pub enum CompressReader {
None(BufReader<File>),
Zstd(Box<ZstdReader<'static, BufReader<File>>>),
pub enum CompressReader<R> {
None(BufReader<R>),
Zstd(Box<ZstdReader<'static, BufReader<R>>>),
}

impl CompressReader {
impl<R: Read + Seek> CompressReader<R> {
/// Create a new CompressReader from a File
///
/// It will detect the compression type using `magic bytes` and return the appropriate reader.
///
/// **Note**: The reader will be return to the original position after detecting the compression type.
pub fn new(mut inner: File) -> io::Result<Self> {
pub fn new(mut inner: R) -> io::Result<Self> {
let current_pos = inner.stream_position()?;

let mut magic_bytes = [0u8; 4];
Expand All @@ -85,14 +85,6 @@ impl CompressReader {
})
}

pub fn file_size(&self) -> io::Result<usize> {
Ok(match self {
Self::None(inner) => inner.get_ref().metadata()?.len() as usize,
//TODO: Can't see how to get the file size from ZstdReader
Self::Zstd(_inner) => 0,
})
}

pub fn compress_type(&self) -> CompressType {
match self {
CompressReader::None(_) => CompressType::None,
Expand All @@ -105,7 +97,17 @@ impl CompressReader {
}
}

impl Read for CompressReader {
impl CompressReader<File> {
pub fn file_size(&self) -> io::Result<usize> {
Ok(match self {
Self::None(inner) => inner.get_ref().metadata()?.len() as usize,
//TODO: Can't see how to get the file size from ZstdReader
Self::Zstd(_inner) => 0,
})
}
}

impl<R: Read> Read for CompressReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
CompressReader::None(inner) => inner.read(buf),
Expand All @@ -114,7 +116,7 @@ impl Read for CompressReader {
}
}

impl io::BufRead for CompressReader {
impl<R: Read> io::BufRead for CompressReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
match self {
CompressReader::None(inner) => inner.fill_buf(),
Expand All @@ -130,7 +132,7 @@ impl io::BufRead for CompressReader {
}
}

impl Seek for CompressReader {
impl<R: Read + Seek> Seek for CompressReader<R> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
match self {
CompressReader::None(inner) => inner.seek(pos),
Expand Down
2 changes: 1 addition & 1 deletion crates/fs-utils/src/dir_trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl DirTrie {
/// It will be decompressed based on the file's magic bytes.
///
/// It will be opened with [`o_rdonly`].
pub fn open_entry_reader(&self, file_id: &FileId) -> Result<CompressReader, io::Error> {
pub fn open_entry_reader(&self, file_id: &FileId) -> Result<CompressReader<File>, io::Error> {
let path = self.file_path(file_id);
Self::create_parent(&path)?;
CompressReader::new(o_rdonly().open(path)?)
Expand Down
86 changes: 86 additions & 0 deletions crates/fs-utils/src/direct_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
mod page;
mod reader;
mod writer;

use std::{
fs::{File, OpenOptions},
io,
path::Path,
};

pub use self::{page::Page, reader::AlignedBufReader, writer::AlignedBufWriter};

/// Open a [File] according to [OpenOptions], enabling `O_DIRECT` or a platform
/// equivalent.
///
/// On all supported platforms, direct I/O requires alignment of memory buffers
/// and file offsets to the logical block size of the filesystem. Wrap the
/// returned [File] in [AlignedBufReader] or [AlignedBufWriter] respectively to
/// have this being taken care of for you.
///
/// # Platform differences
///
/// * Unix (except macOS):
///
/// The file will be opened with the `O_DIRECT` flag.
///
/// * macOS:
///
/// The `F_NOCACHE` fcntl will be set on the opened file.
/// It may be necessary to set `F_PREALLOCATE` as well[1].
///
/// * Windows:
///
/// The file will be opened with [FILE_FLAG_NO_BUFFERING].
///
/// [1]: https://forums.developer.apple.com/forums/thread/25464
/// [FILE_FLAG_NO_BUFFERING]: https://docs.microsoft.com/en-us/windows/win32/fileio/file-buffering
pub fn open_file(path: impl AsRef<Path>, opts: &mut OpenOptions) -> io::Result<File> {
open_file_impl(path.as_ref(), opts)
}

/// Open the file at `path` for reading in `O_DIRECT` mode and wrap it in an
/// [AlignedBufReader].
pub fn file_reader(path: impl AsRef<Path>) -> io::Result<AlignedBufReader<File>> {
open_file(path, OpenOptions::new().read(true)).map(AlignedBufReader::new)
}

/// Open the file at `path` for writing in `O_DIRECT` mode and wrap it in an
/// [AlignedBufWriter].
///
/// The file will be created if it does not exist, and truncated if it does.
pub fn file_writer(path: impl AsRef<Path>) -> io::Result<AlignedBufWriter<File>> {
open_file(path, OpenOptions::new().create(true).write(true).truncate(true)).map(AlignedBufWriter::new)
}

#[cfg(all(unix, not(target_os = "macos")))]
fn open_file_impl(path: &Path, opts: &mut OpenOptions) -> io::Result<File> {
use std::os::unix::fs::OpenOptionsExt as _;

opts.custom_flags(libc::O_DIRECT);
opts.open(path)
}

#[cfg(target_os = "macos")]
fn open_file_impl(path: &Path, opts: &mut OpenOptions) -> io::Result<File> {
use libc::{fcntl, F_NOCACHE};
use std::os::fd::AsRawFd;

let file = opts.open(path)?;
let fd = file.as_raw_fd();
let ret = unsafe { fcntl(fd, F_NOCACHE, 1) };
if ret != 0 {
return Err(io::Error::from_raw_os_error(ret));
}

Ok(file)
}

#[cfg(target_os = "windows")]
fn open_file_impl(path: &Path, opts: &mut OpenOptions) -> io::Result<File> {
use std::os::windows::fs::OpenOptionsExt;
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_NO_BUFFERING;

opts.custom_flags(FILE_FLAG_NO_BUFFERING);
opts.open(path)
}
Loading
Loading