diff --git a/Cargo.lock b/Cargo.lock index d3172c86728..f330467d1cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3633,9 +3633,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.177" +version = "0.2.182" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" [[package]] name = "libgit2-sys" @@ -8024,10 +8024,12 @@ dependencies = [ "anyhow", "fs2", "hex", + "libc", "rand 0.9.2", "tempdir", "thiserror 1.0.69", "tokio", + "windows-sys 0.59.0", "zstd-framed", ] diff --git a/Cargo.toml b/Cargo.toml index 006d5afc6de..fa7dc11e136 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -227,6 +227,7 @@ jwks = { package = "spacetimedb-jwks", version = "0.1.3" } lazy_static = "1.4.0" lean_string = "0.5.1" log = "0.4.17" +libc = "0.2.182" memchr = "2" mimalloc = "0.1.39" names = "0.14" @@ -382,6 +383,7 @@ features = [ "ondemand", ] + [workspace.lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] } diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 7dbaf56fc78..e19183eb1a0 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use log::{debug, warn}; use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader}; +use spacetimedb_fs_utils::direct_io; use spacetimedb_paths::server::{CommitLogDir, SegmentFile}; use tempfile::NamedTempFile; @@ -29,7 +30,7 @@ pub type OnNewSegmentFn = dyn Fn() + Send + Sync + 'static; /// Size on disk of a [Fs] repo. /// /// Created by [Fs::size_on_disk]. -#[derive(Clone, Copy, Default)] +#[derive(Clone, Copy, Debug, Default)] pub struct SizeOnDisk { /// The total size in bytes of all segments and offset indexes in the repo. pub total_bytes: u64, @@ -156,7 +157,7 @@ impl FileLike for NamedTempFile { impl Repo for Fs { type SegmentWriter = File; - type SegmentReader = CompressReader; + type SegmentReader = CompressReader; fn create_segment(&self, offset: u64) -> io::Result { File::options() @@ -213,18 +214,21 @@ impl Repo for Fs { } fn compress_segment(&self, offset: u64) -> io::Result<()> { - let src = self.open_segment_reader(offset)?; + let segment_path = self.segment_path(offset); + let src = direct_io::file_reader(&segment_path).and_then(CompressReader::new)?; // if it's already compressed, leave it be let CompressReader::None(mut src) = src else { return Ok(()); }; - let mut dst = NamedTempFile::new_in(&self.root)?; + let tmp = NamedTempFile::new_in(&self.root)?.into_temp_path(); + let mut dst = direct_io::file_writer(&tmp)?; // bytes per frame. in the future, it might be worth looking into putting // every commit into its own frame, to make seeking more efficient. let max_frame_size = 0x1000; compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?; - dst.persist(self.segment_path(offset))?; + dst.get_ref().sync_all()?; + tmp.persist(segment_path)?; Ok(()) } @@ -266,7 +270,7 @@ impl Repo for Fs { } } -impl SegmentLen for CompressReader {} +impl SegmentLen for CompressReader {} #[cfg(feature = "streaming")] impl crate::stream::AsyncRepo for Fs { diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index 85ab653480d..dfa6fed1f47 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -87,7 +87,7 @@ fn compression() { let clog = Commitlog::open( CommitLogDir::from_path_unchecked(root.path()), Options { - max_segment_size: 8 * 1024, + max_segment_size: 16 * 1024, max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, @@ -97,7 +97,7 @@ fn compression() { // try to generate commitlogs that will be amenable to compression - // random data doesn't compress well, so try and have there be repetition - let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::>(); + let payloads = (0..4).map(|_| gen_payload()).cycle().take(1500).collect::>(); for payload in &payloads { clog.append_maybe_flush(*payload).unwrap(); } @@ -111,7 +111,12 @@ fn compression() { clog.compress_segments(segments_to_compress).unwrap(); let compressed_size = clog.size_on_disk().unwrap(); - assert!(compressed_size.total_bytes < uncompressed_size.total_bytes); + assert!( + compressed_size.total_bytes < uncompressed_size.total_bytes, + "expected total size to be smaller after compression: uncompressed={:?} compressed={:?}", + uncompressed_size, + compressed_size + ); assert!(clog .transactions(&payload::ArrayDecoder) diff --git a/crates/fs-utils/Cargo.toml b/crates/fs-utils/Cargo.toml index 56ebd4f8c32..e4f57f19c2f 100644 --- a/crates/fs-utils/Cargo.toml +++ b/crates/fs-utils/Cargo.toml @@ -15,6 +15,12 @@ thiserror.workspace = true tokio.workspace = true zstd-framed.workspace = true +[target.'cfg(unix)'.dependencies] +libc.workspace = true + +[target.'cfg(windows)'.dependencies] +windows-sys.workspace = true + [dev-dependencies] tempdir.workspace = true diff --git a/crates/fs-utils/src/compression.rs b/crates/fs-utils/src/compression.rs index 8e72fcbe76d..eb036ad618e 100644 --- a/crates/fs-utils/src/compression.rs +++ b/crates/fs-utils/src/compression.rs @@ -47,18 +47,18 @@ impl CompressType { } /// A reader that can read compressed files -pub enum CompressReader { - None(BufReader), - Zstd(Box>>), +pub enum CompressReader { + None(BufReader), + Zstd(Box>>), } -impl CompressReader { +impl CompressReader { /// Create a new CompressReader from a File /// /// It will detect the compression type using `magic bytes` and return the appropriate reader. /// /// **Note**: The reader will be return to the original position after detecting the compression type. - pub fn new(mut inner: File) -> io::Result { + pub fn new(mut inner: R) -> io::Result { let current_pos = inner.stream_position()?; let mut magic_bytes = [0u8; 4]; @@ -85,14 +85,6 @@ impl CompressReader { }) } - pub fn file_size(&self) -> io::Result { - Ok(match self { - Self::None(inner) => inner.get_ref().metadata()?.len() as usize, - //TODO: Can't see how to get the file size from ZstdReader - Self::Zstd(_inner) => 0, - }) - } - pub fn compress_type(&self) -> CompressType { match self { CompressReader::None(_) => CompressType::None, @@ -105,7 +97,17 @@ impl CompressReader { } } -impl Read for CompressReader { +impl CompressReader { + pub fn file_size(&self) -> io::Result { + Ok(match self { + Self::None(inner) => inner.get_ref().metadata()?.len() as usize, + //TODO: Can't see how to get the file size from ZstdReader + Self::Zstd(_inner) => 0, + }) + } +} + +impl Read for CompressReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { match self { CompressReader::None(inner) => inner.read(buf), @@ -114,7 +116,7 @@ impl Read for CompressReader { } } -impl io::BufRead for CompressReader { +impl io::BufRead for CompressReader { fn fill_buf(&mut self) -> io::Result<&[u8]> { match self { CompressReader::None(inner) => inner.fill_buf(), @@ -130,7 +132,7 @@ impl io::BufRead for CompressReader { } } -impl Seek for CompressReader { +impl Seek for CompressReader { fn seek(&mut self, pos: SeekFrom) -> io::Result { match self { CompressReader::None(inner) => inner.seek(pos), diff --git a/crates/fs-utils/src/dir_trie.rs b/crates/fs-utils/src/dir_trie.rs index 03b93eda13c..761b5761dc5 100644 --- a/crates/fs-utils/src/dir_trie.rs +++ b/crates/fs-utils/src/dir_trie.rs @@ -187,7 +187,7 @@ impl DirTrie { /// It will be decompressed based on the file's magic bytes. /// /// It will be opened with [`o_rdonly`]. - pub fn open_entry_reader(&self, file_id: &FileId) -> Result { + pub fn open_entry_reader(&self, file_id: &FileId) -> Result, io::Error> { let path = self.file_path(file_id); Self::create_parent(&path)?; CompressReader::new(o_rdonly().open(path)?) diff --git a/crates/fs-utils/src/direct_io.rs b/crates/fs-utils/src/direct_io.rs new file mode 100644 index 00000000000..c71675a7d55 --- /dev/null +++ b/crates/fs-utils/src/direct_io.rs @@ -0,0 +1,86 @@ +mod page; +mod reader; +mod writer; + +use std::{ + fs::{File, OpenOptions}, + io, + path::Path, +}; + +pub use self::{page::Page, reader::AlignedBufReader, writer::AlignedBufWriter}; + +/// Open a [File] according to [OpenOptions], enabling `O_DIRECT` or a platform +/// equivalent. +/// +/// On all supported platforms, direct I/O requires alignment of memory buffers +/// and file offsets to the logical block size of the filesystem. Wrap the +/// returned [File] in [AlignedBufReader] or [AlignedBufWriter] respectively to +/// have this being taken care of for you. +/// +/// # Platform differences +/// +/// * Unix (except macOS): +/// +/// The file will be opened with the `O_DIRECT` flag. +/// +/// * macOS: +/// +/// The `F_NOCACHE` fcntl will be set on the opened file. +/// It may be necessary to set `F_PREALLOCATE` as well[1]. +/// +/// * Windows: +/// +/// The file will be opened with [FILE_FLAG_NO_BUFFERING]. +/// +/// [1]: https://forums.developer.apple.com/forums/thread/25464 +/// [FILE_FLAG_NO_BUFFERING]: https://docs.microsoft.com/en-us/windows/win32/fileio/file-buffering +pub fn open_file(path: impl AsRef, opts: &mut OpenOptions) -> io::Result { + open_file_impl(path.as_ref(), opts) +} + +/// Open the file at `path` for reading in `O_DIRECT` mode and wrap it in an +/// [AlignedBufReader]. +pub fn file_reader(path: impl AsRef) -> io::Result> { + open_file(path, OpenOptions::new().read(true)).map(AlignedBufReader::new) +} + +/// Open the file at `path` for writing in `O_DIRECT` mode and wrap it in an +/// [AlignedBufWriter]. +/// +/// The file will be created if it does not exist, and truncated if it does. +pub fn file_writer(path: impl AsRef) -> io::Result> { + open_file(path, OpenOptions::new().create(true).write(true).truncate(true)).map(AlignedBufWriter::new) +} + +#[cfg(all(unix, not(target_os = "macos")))] +fn open_file_impl(path: &Path, opts: &mut OpenOptions) -> io::Result { + use std::os::unix::fs::OpenOptionsExt as _; + + opts.custom_flags(libc::O_DIRECT); + opts.open(path) +} + +#[cfg(target_os = "macos")] +fn open_file_impl(path: &Path, opts: &mut OpenOptions) -> io::Result { + use libc::{fcntl, F_NOCACHE}; + use std::os::fd::AsRawFd; + + let file = opts.open(path)?; + let fd = file.as_raw_fd(); + let ret = unsafe { fcntl(fd, F_NOCACHE, 1) }; + if ret != 0 { + return Err(io::Error::from_raw_os_error(ret)); + } + + Ok(file) +} + +#[cfg(target_os = "windows")] +fn open_file_impl(path: &Path, opts: &mut OpenOptions) -> io::Result { + use std::os::windows::fs::OpenOptionsExt; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_NO_BUFFERING; + + opts.custom_flags(FILE_FLAG_NO_BUFFERING); + opts.open(path) +} diff --git a/crates/fs-utils/src/direct_io/page.rs b/crates/fs-utils/src/direct_io/page.rs new file mode 100644 index 00000000000..39250b56c75 --- /dev/null +++ b/crates/fs-utils/src/direct_io/page.rs @@ -0,0 +1,136 @@ +use std::ops::{Deref, DerefMut}; + +/// The (assumed) size of an OS memory page: 4096 bytes. +pub const PAGE_SIZE: usize = 4096; + +/// The (assumed) size of a logical device block. +/// +/// Under Linux, this is the value returned by `blockdev --getss`. +/// +/// Although the value may differ across machines, filesystems or OSs, we +/// currently assume that it is a safe value to align I/O buffers to. +pub const BLOCK_SIZE: usize = 512; + +#[derive(Debug)] +#[repr(C, align(512))] +struct Aligned([u8; PAGE_SIZE]); + +impl Deref for Aligned { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Aligned { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A buffer of size [`PAGE_SIZE`], aligned to [`BLOCK_SIZE`]. +/// +/// The memory of the buffer is intended to be reused by manipulating its +/// (write) position (similar to a cursor). +#[derive(Debug)] +pub struct Page { + buf: Aligned, + pos: usize, +} + +impl Page { + /// Create a new page. + pub fn new() -> Self { + Self { + buf: Aligned([0; PAGE_SIZE]), + pos: 0, + } + } + + /// The current position, i.e. up to which offset the buffer is considered + /// filled. + #[inline] + pub fn pos(&self) -> usize { + self.pos + } + + /// Reset the current position to zero. + #[inline] + pub fn reset(&mut self) { + self.pos = 0; + } + + /// Set the current position to `pos`. + /// + /// The caller must ensure that `pos <= self.buf.len()`. + #[inline] + pub(super) fn set_pos(&mut self, pos: usize) { + debug_assert!(pos <= self.buf.len(), "pos > buf.len()"); + self.pos = pos; + } + + /// Return the entire underlying buffer as a slice, regardless of position. + #[inline] + pub fn buf(&self) -> &[u8] { + &self.buf + } + + /// Return the entire underlying buffer as a mutable slice, regardless of + /// position. + /// + /// The caller must ensure to update the position as necessary. + #[inline] + pub fn buf_mut(&mut self) -> &mut [u8] { + &mut self.buf + } + + /// Return the current position, rounded up to the next multiple of + /// [`BLOCK_SIZE`]. + /// + /// Never exceeds the length of the buffer. + #[inline] + pub fn next_block_offset(&self) -> usize { + self.pos.next_multiple_of(BLOCK_SIZE).min(self.buf.len()) + } + + /// Copy the given slice to the internal buffer, starting from `self.pos()`. + /// + /// The position is updated with the length of the source slice. + /// + /// # Panics + /// + /// Panics if there is not enough space to copy `src`, i.e. + /// `src.len() > self.spare_capacity()`. + #[inline] + pub fn copy_from_slice(&mut self, src: &[u8]) { + self.buf[self.pos..self.pos + src.len()].copy_from_slice(src); + self.pos += src.len(); + } + + /// `true` if the current position is zero. + #[inline] + pub const fn is_empty(&self) -> bool { + self.pos == 0 + } + + /// `true` if the buffer is full, shorthand for `self.spare_capacity() == 0`. + pub const fn is_full(&self) -> bool { + self.spare_capacity() == 0 + } + + /// Returns the number of bytes remaining in the buffer after `self.pos()`. + pub const fn spare_capacity(&self) -> usize { + self.buf.0.len() - self.pos + } + + pub const fn capacity(&self) -> usize { + PAGE_SIZE + } +} + +impl Default for Page { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/fs-utils/src/direct_io/reader.rs b/crates/fs-utils/src/direct_io/reader.rs new file mode 100644 index 00000000000..9c14c5cb5f2 --- /dev/null +++ b/crates/fs-utils/src/direct_io/reader.rs @@ -0,0 +1,117 @@ +use std::{ + cmp, + io::{self, BufRead, Read, Seek}, +}; + +use super::page::Page; + +/// A buffered reader using an aligned buffer internally. +/// +/// The alignment makes the reader suitable for files opened using `O_DIRECT` +/// or a platform equivalent. +/// +/// Other than the alignment of the buffer, this is basically a stripped down +/// version of [`io::BufReader`], borrowing much of its code. +pub struct AlignedBufReader { + inner: R, + + page: Page, + /// The number of bytes read during the last `fill_buf`. + /// + /// That is, `page.buf()[page.pos()..filled]` is the currently buffered, + /// unconsumed data. + filled: usize, +} + +impl AlignedBufReader { + /// Create a new [AlignedBufReader] wrapping the `inner` reader. + pub fn new(inner: R) -> Self { + Self { + inner, + page: Page::new(), + filled: 0, + } + } + + pub const fn from_raw_parts(inner: R, page: Page, filled: usize) -> Self { + Self { inner, page, filled } + } + + pub fn into_raw_parts(self) -> (R, Page, usize) { + (self.inner, self.page, self.filled) + } +} + +impl AlignedBufReader { + #[inline] + fn consume_with(&mut self, amt: usize, mut visitor: impl FnMut(&[u8])) -> bool { + if let Some(claimed) = self.page.buf()[self.page.pos()..self.filled].get(..amt) { + visitor(claimed); + self.consume(amt); + true + } else { + false + } + } +} + +impl Read for AlignedBufReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let mut rem = self.fill_buf()?; + let n = rem.read(buf)?; + self.consume(n); + + Ok(n) + } + + fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { + if self.consume_with(buf.len(), |claimed| buf.copy_from_slice(claimed)) { + return Ok(()); + } + + let mut buf = buf; + while !buf.is_empty() { + match self.read(buf) { + Ok(0) => break, + Ok(n) => { + buf = &mut buf[n..]; + } + Err(e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + + if !buf.is_empty() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to fill whole page", + )); + } + + Ok(()) + } +} + +impl BufRead for AlignedBufReader { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + if self.page.pos() >= self.filled { + let n = self.inner.read(self.page.buf_mut())?; + self.page.reset(); + self.filled = n; + } + + Ok(&self.page.buf()[self.page.pos()..self.filled]) + } + + fn consume(&mut self, amt: usize) { + self.page.set_pos(cmp::min(self.page.pos() + amt, self.filled)); + } +} + +impl Seek for AlignedBufReader { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + self.page.reset(); + self.filled = 0; + self.inner.seek(pos) + } +} diff --git a/crates/fs-utils/src/direct_io/writer.rs b/crates/fs-utils/src/direct_io/writer.rs new file mode 100644 index 00000000000..3dacd91a532 --- /dev/null +++ b/crates/fs-utils/src/direct_io/writer.rs @@ -0,0 +1,123 @@ +use std::io; + +use super::page::Page; + +/// A buffered writer using an aligned buffer internally. +/// +/// Similar to [`io::BufWriter`], but suitable for files opened using `O_DIRECT` +/// or a platform equivalent, due to the alignment. +/// +/// # Flushing behaviour +/// +/// [`io::Write::write`] calls will only flush the buffer when it is full. +/// [`io::Write::flush`] calls, however, will flush the buffer up to the next +/// [`BLOCK_SIZE`] boundary, padding the data with zeroes if necessary. +/// +/// This is done so that partial writes to the underlying storage can be +/// detected on the application layer, and retried if appropriate. It is also +/// necessary to preserve `fsync` semantics: a [`PagedWriter`] replaces the OS +/// page cache, where the latter is flushed to the device when `fsync` is called. +/// +/// After a flush of an underfull buffer, the file and page positions are +/// rewound to the _previous_ [`BLOCK_SIZE`] boundary, such that the subsequent +/// write will overwrite the padding if more data has been added to the writer. +/// +/// Dropping a [`PagedWriter`] will attempt to flush all data, but will not sync +/// it. +#[derive(Debug)] +pub struct AlignedBufWriter { + inner: W, + page: Page, +} + +impl AlignedBufWriter { + /// Create a new `AlignedBufWriter` wrapping the `inner` writer. + pub fn new(inner: W) -> Self { + Self::from_raw_parts(inner, Page::new()) + } + + /// Create a new `AlignedBufWriter` from its constituent parts. + /// + /// This allows to reuse [`Page`]s. + pub const fn from_raw_parts(inner: W, page: Page) -> Self { + Self { inner, page } + } + + /// Get a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + &self.inner + } +} + +impl io::Write for AlignedBufWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let mut wrote = 0; + let mut buf = buf; + + while !buf.is_empty() { + let (chunk, rest) = buf.split_at(self.page.spare_capacity().min(buf.len())); + self.page.copy_from_slice(chunk); + if self.page.is_full() { + self.flush()?; + } + wrote += chunk.len(); + buf = rest; + } + + Ok(wrote) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + let pos = self.page.pos(); + let next_block = self.page.next_block_offset(); + + // Pad with zeroes. + self.page.buf_mut()[pos..next_block].fill(0); + let buf = &self.page.buf()[..next_block]; + let len = buf.len(); + + self.inner.write_all(buf)?; + if pos + len > self.page.capacity() { + self.page.reset(); + } else { + self.page.set_pos(pos + len); + } + self.inner.flush()?; + + Ok(()) + } +} + +impl Drop for AlignedBufWriter { + fn drop(&mut self) { + let _ = io::Write::flush(self); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write as _; + + #[test] + fn write_flushes_aligned() { + let mut writer = AlignedBufWriter::new(Vec::new()); + writer.write_all(&[42; 4096]).unwrap(); + writer.write_all(&[1; 512]).unwrap(); + + assert_eq!(&writer.inner, &[42; 4096]) + } + + #[test] + fn flush_flushes_all_with_padding() { + let mut writer = AlignedBufWriter::new(Vec::new()); + writer.write_all(&[42; 5000]).unwrap(); + writer.flush().unwrap(); + + assert_eq!( + &writer.inner, + [[42; 5000].as_slice(), [0; 120].as_slice()].concat().as_slice() + ); + } +} diff --git a/crates/fs-utils/src/lib.rs b/crates/fs-utils/src/lib.rs index c4d1a6ba0c1..65ba44864b0 100644 --- a/crates/fs-utils/src/lib.rs +++ b/crates/fs-utils/src/lib.rs @@ -3,6 +3,7 @@ use std::path::Path; pub mod compression; pub mod dir_trie; +pub mod direct_io; pub mod lockfile; pub fn create_parent_dir(file: &Path) -> Result<(), std::io::Error> {