From 0f85fa9664f34e1bfd0b149f7d91a8b7858274b2 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 18 Feb 2026 14:21:04 +0100 Subject: [PATCH 1/4] commitlog: Improve `committed_meta` - Extends `commit::Metadata` to include the checksum - Extends `segment::Metadata` to include `Some(commit::Metadata)` containing the last commit in the segment (if there is one) - Changes `committed_meta` to: - ignore empty segments at the end of the log - try harder to provide useful metadata, even if only a prefix of the latest segment is readable This is allows to eliminate remaining `Commitlog::open` calls with the purpose of querying the latest commit (offset). `Commitlog::open` creates an empty segment if the tail of the log is corrupt, which is a non-obvious side-effect that can be confusing when debugging. It also allows to eliminate uses where the `commits_from` iterator is used to find the latest full commit. The `Commits` iterator requires the caller to handle the case of a corrupted commit at the end of the log, by advancing the iterator once more after it has yielded an error in order to check that it is exhausted, and then deciding whether to ignore the error. This is easy to forget. `committed_meta` now just does the right thing, preserving information about tail corruption for when that's useful. --- crates/commitlog/src/commit.rs | 27 ++++++---- crates/commitlog/src/commitlog.rs | 84 +++++++++++++++++++++++++++---- crates/commitlog/src/lib.rs | 3 +- crates/commitlog/src/repo/mod.rs | 29 ++++++++++- crates/commitlog/src/segment.rs | 37 ++++++++++---- 5 files changed, 150 insertions(+), 30 deletions(-) diff --git a/crates/commitlog/src/commit.rs b/crates/commitlog/src/commit.rs index fa42e44a109..a422f4b1e3c 100644 --- a/crates/commitlog/src/commit.rs +++ b/crates/commitlog/src/commit.rs @@ -342,30 +342,39 @@ impl StoredCommit { } } -/// Numbers needed to compute [`crate::segment::Header`]. +/// A [`StoredCommit`] sans the records payload. #[derive(Clone, Debug, Eq, PartialEq)] pub struct Metadata { pub tx_range: Range, pub size_in_bytes: u64, pub epoch: u64, + pub checksum: u32, } impl Metadata { - /// Extract the [`Metadata`] of a single [`Commit`] from the given reader. + /// Extract the [`Metadata`] of a single [`StoredCommit`] from the given + /// reader. /// /// Note that this decodes the commit due to checksum verification. - /// Like [`Commit::decode`], returns `None` if the reader is at EOF already. + /// Like [`StoredCommit::decode`], this method returns `None` if the reader + /// is at EOF already. pub fn extract(reader: R) -> io::Result> { - Commit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from)) + StoredCommit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from)) } } -impl From for Metadata { - fn from(commit: Commit) -> Self { +impl From for Metadata { + fn from(commit: StoredCommit) -> Self { + let tx_range = commit.tx_range(); + let epoch = commit.epoch; + let checksum = commit.checksum; + let size_in_bytes = Commit::from(commit).encoded_len() as u64; + Self { - tx_range: commit.tx_range(), - size_in_bytes: commit.encoded_len() as u64, - epoch: commit.epoch, + tx_range, + size_in_bytes, + epoch, + checksum, } } } diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03c590e4950..b569af9d63b 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -355,7 +355,77 @@ impl Drop for Generic { } } -/// Extract the most recently written [`segment::Metadata`] from the commitlog +/// The most recently written [segment::Metadata] for a given [Repo]. +/// +/// The type preserves the error information in case the most recent segment +/// contains corrupted data at the end (typically due to a torn write). +/// +/// Created by [committed_meta]. +pub enum CommittedMeta { + /// The most recent segment could not be traversed successfully until the + /// end, i.e. there is trailing garbage in the segment. + Prefix { + /// The metadata of the prefix that could be traversed successfully. + /// + /// It is guaranteed that the metadata spans at least one commit. + metadata: segment::Metadata, + /// The error encountered. + error: io::Error, + }, + /// The most recent segment could be traversed successfully until the end. + Complete { + /// The segment metadata. + /// + /// It is guaranteed that the metadata spans at least one commit. + metadata: segment::Metadata, + }, +} + +impl CommittedMeta { + pub fn metadata(&self) -> &segment::Metadata { + let (Self::Prefix { metadata, .. } | Self::Complete { metadata }) = self; + metadata + } + + fn extract(repo: impl Repo) -> io::Result> { + let Some((offset, mut storage)) = repo::open_newest_non_empty_segment(&repo)? else { + return Ok(None); + }; + let offset_index = repo.get_offset_index(offset).ok(); + match segment::Metadata::extract(offset, &mut storage, offset_index.as_ref()) { + // TODO: Should this be an `assert!` instead? + Ok(metadata) if metadata.tx_range.is_empty() => Ok(None), + // Segment is intact. + Ok(metadata) => Ok(Some(CommittedMeta::Complete { metadata })), + // Segment is non-empty, but first commit is corrupt. + Err(error::SegmentMetadata::InvalidCommit { sofar, source }) if sofar.tx_range.is_empty() => { + Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "repo {}: first commit in the most recent segment is corrupt: {}", + repo, source + ), + )) + } + // Some prefix of the segment is good. + Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => Ok(Some(CommittedMeta::Prefix { + metadata: sofar, + error: source, + })), + // Something went wrong, including out-of-order errors and such. + Err(error::SegmentMetadata::Io(e)) => Err(e), + } + } +} + +impl From for segment::Metadata { + fn from(meta: CommittedMeta) -> Self { + let (CommittedMeta::Prefix { metadata, .. } | CommittedMeta::Complete { metadata }) = meta; + metadata + } +} + +/// Extract the most recently written [CommittedMeta] from the commitlog /// in `repo`. /// /// Returns `None` if the commitlog is empty. @@ -373,18 +443,12 @@ impl Drop for Generic { /// like so: /// /// ```ignore -/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end); +/// let max_offset = committed_meta(..)?.map(|meta| meta.metadata().tx_range.end); /// ``` /// /// Unlike `open`, no segment will be created in an empty `repo`. -pub fn committed_meta(repo: impl Repo) -> Result, error::SegmentMetadata> { - let Some(last) = repo.existing_offsets()?.pop() else { - return Ok(None); - }; - - let mut storage = repo.open_segment_reader(last)?; - let offset_index = repo.get_offset_index(last).ok(); - segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some) +pub fn committed_meta(repo: impl Repo) -> io::Result> { + CommittedMeta::extract(repo) } pub fn commits_from(repo: R, max_log_format_version: u8, offset: u64) -> io::Result> { diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 1e7a8c0047e..a5dc3d09c26 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -19,6 +19,7 @@ mod varint; pub use crate::{ commit::{Commit, StoredCommit}, + commitlog::CommittedMeta, payload::{Decoder, Encode}, repo::fs::SizeOnDisk, segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION}, @@ -558,7 +559,7 @@ impl Commitlog { /// ``` /// /// Unlike `open`, no segment will be created in an empty `repo`. -pub fn committed_meta(root: CommitLogDir) -> Result, error::SegmentMetadata> { +pub fn committed_meta(root: CommitLogDir) -> io::Result> { commitlog::committed_meta(repo::Fs::new(root, None)?) } diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 5be633bf5c7..478bbd7b6cb 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -6,7 +6,7 @@ use crate::{ commit::Commit, error, index::{IndexFile, IndexFileMut}, - segment::{FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer}, + segment::{self, FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer}, Options, }; @@ -258,6 +258,7 @@ pub fn resume_segment_writer( size_in_bytes, max_epoch, max_commit_offset: _, + max_commit: _, } = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) { Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => { warn!("invalid commit in segment {offset}: {source}"); @@ -314,6 +315,32 @@ pub fn open_segment_reader( Reader::new(max_log_format_version, offset, storage) } +/// Open the newest, non-empty segment in `repo`. +/// +/// A segment is considered non-empty if its size is greater than +/// [segment::Header::LEN]. +/// +/// Returns the segment offset alongside the raw [Repo::SegmentReader], +/// or `None` if the repo is empty (i.e. contains no non-empty segment). +pub(crate) fn open_newest_non_empty_segment(repo: &R) -> io::Result> { + let mut segments = repo.existing_offsets()?; + + let mut newest; + let mut reader; + loop { + let Some(last) = segments.pop() else { + return Ok(None); + }; + newest = last; + reader = repo.open_segment_reader(newest)?; + if reader.segment_len()? > segment::Header::LEN as u64 { + break; + } + } + + Ok(Some((newest, reader))) +} + /// Allocate [Options::max_segment_size] of space for [FileLike] /// if the `fallocate` feature is enabled, /// and [Options::preallocate_segments] is `true`. diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 7e8c054467b..7598f05e1c0 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -594,6 +594,7 @@ pub struct Metadata { /// `max_commit_offset..tx_range.end` is the range of /// transactions contained in it. pub max_commit_offset: u64, + pub max_commit: Option, } impl Metadata { @@ -627,6 +628,7 @@ impl Metadata { size_in_bytes: Header::LEN as u64, max_epoch: u64::default(), max_commit_offset: min_tx_offset, + max_commit: None, }); reader.seek(SeekFrom::Start(sofar.size_in_bytes))?; @@ -663,6 +665,7 @@ impl Metadata { // TODO: Should it be an error to encounter an epoch going backwards? sofar.max_epoch = commit.epoch.max(sofar.max_epoch); sofar.max_commit_offset = commit.tx_range.start; + sofar.max_commit = Some(commit); } Ok(sofar) @@ -695,6 +698,7 @@ impl Metadata { size_in_bytes: byte_offset + commit.size_in_bytes, max_epoch: commit.epoch, max_commit_offset: commit.tx_range.start, + max_commit: Some(commit), }); } @@ -833,18 +837,33 @@ mod tests { writer.commit().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); - let metadata = reader.metadata().unwrap(); + let Metadata { + header, + tx_range, + size_in_bytes, + max_epoch, + max_commit_offset, + max_commit, + } = reader.metadata().unwrap(); assert_eq!( - metadata, - Metadata { - header: Header::default(), - tx_range: Range { start: 0, end: 5 }, + ( + header, + tx_range, + size_in_bytes, + max_epoch, + max_commit_offset, + max_commit.is_some_and(|meta| meta.tx_range == (3..5)) + ), + ( + Header::default(), + 0..5, // header + 5 txs + 3 commits - size_in_bytes: (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64, - max_epoch: Commit::DEFAULT_EPOCH, - max_commit_offset: 3 - } + (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64, + Commit::DEFAULT_EPOCH, + 3, + true + ) ); } From de661d9146a9aeeafb6400d3d65277f2843b8673 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 23 Feb 2026 11:29:27 +0100 Subject: [PATCH 2/4] Change additional non-empty check to assert --- crates/commitlog/src/commitlog.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index b569af9d63b..084e00090b8 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -393,10 +393,14 @@ impl CommittedMeta { }; let offset_index = repo.get_offset_index(offset).ok(); match segment::Metadata::extract(offset, &mut storage, offset_index.as_ref()) { - // TODO: Should this be an `assert!` instead? - Ok(metadata) if metadata.tx_range.is_empty() => Ok(None), // Segment is intact. - Ok(metadata) => Ok(Some(CommittedMeta::Complete { metadata })), + Ok(metadata) => { + assert!( + !metadata.tx_range.is_empty(), + "segment was promised to be non-empty but contains zero transactions" + ); + Ok(Some(CommittedMeta::Complete { metadata })) + } // Segment is non-empty, but first commit is corrupt. Err(error::SegmentMetadata::InvalidCommit { sofar, source }) if sofar.tx_range.is_empty() => { Err(io::Error::new( From 74b510e2e257ec3123969a99757279345c88644e Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 23 Feb 2026 12:29:55 +0100 Subject: [PATCH 3/4] Map >1 empty segment to `CommittedMeta::Partial` variant --- crates/commitlog/src/commitlog.rs | 71 ++++++++++++++++++++++++++++--- crates/commitlog/src/repo/mod.rs | 26 ----------- 2 files changed, 66 insertions(+), 31 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 084e00090b8..5e7d252244a 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -15,7 +15,7 @@ use crate::{ error::{self, source_chain}, index::IndexError, payload::Decoder, - repo::{self, Repo, TxOffsetIndex}, + repo::{self, Repo, SegmentLen as _, TxOffsetIndex}, segment::{self, FileLike, Transaction, Writer}, Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION, }; @@ -355,6 +355,51 @@ impl Drop for Generic { } } +/// The most recent non empty segment in repo `R`. +/// +/// Created by [open_newest_non_empty_segment]. +struct MostRecentNonEmptySegment { + /// Number of empty segments that were ignored. + empty_segments: usize, + /// Offset of the non-empty segment. + segment_offset: u64, + /// [Repo::SegmentReader] for the non-empty segment. + segment_reader: R, +} + +/// Open the most recent segment in `repo` that is larger than +/// [segment::Header::LEN]. +/// +/// Note that there should be at most one empty segment in the log. We may, +/// however, want to be lenient on this read-only path, so the number of +/// empty segments is tracked in the returned type rather than returning an +/// error. +fn open_newest_non_empty_segment(repo: R) -> io::Result>> { + let mut segments = repo.existing_offsets()?; + + let mut empty_segments = 0; + let mut segment_offset; + let mut segment_reader; + loop { + let Some(last) = segments.pop() else { + return Ok(None); + }; + segment_offset = last; + segment_reader = repo.open_segment_reader(segment_offset)?; + if segment_reader.segment_len()? > segment::Header::LEN as u64 { + break; + } else { + empty_segments += 1; + } + } + + Ok(Some(MostRecentNonEmptySegment { + empty_segments, + segment_offset, + segment_reader, + })) +} + /// The most recently written [segment::Metadata] for a given [Repo]. /// /// The type preserves the error information in case the most recent segment @@ -364,6 +409,9 @@ impl Drop for Generic { pub enum CommittedMeta { /// The most recent segment could not be traversed successfully until the /// end, i.e. there is trailing garbage in the segment. + /// + /// This variant is also returned in case [open_newest_non_empty_segment] + /// finds more than a single empty segment at the end of the log. Prefix { /// The metadata of the prefix that could be traversed successfully. /// @@ -388,19 +436,32 @@ impl CommittedMeta { } fn extract(repo: impl Repo) -> io::Result> { - let Some((offset, mut storage)) = repo::open_newest_non_empty_segment(&repo)? else { + let Some(MostRecentNonEmptySegment { + empty_segments, + segment_offset, + mut segment_reader, + }) = open_newest_non_empty_segment(&repo)? + else { return Ok(None); }; - let offset_index = repo.get_offset_index(offset).ok(); - match segment::Metadata::extract(offset, &mut storage, offset_index.as_ref()) { + let offset_index = repo.get_offset_index(segment_offset).ok(); + match segment::Metadata::extract(segment_offset, &mut segment_reader, offset_index.as_ref()) { // Segment is intact. - Ok(metadata) => { + Ok(metadata) if empty_segments <= 1 => { assert!( !metadata.tx_range.is_empty(), "segment was promised to be non-empty but contains zero transactions" ); Ok(Some(CommittedMeta::Complete { metadata })) } + // Segment is good, but there are too many empty segments. + Ok(metadata) => Ok(Some(CommittedMeta::Prefix { + metadata, + error: io::Error::new( + io::ErrorKind::InvalidData, + format!("repo {}: too many empty segments: {}", repo, empty_segments), + ), + })), // Segment is non-empty, but first commit is corrupt. Err(error::SegmentMetadata::InvalidCommit { sofar, source }) if sofar.tx_range.is_empty() => { Err(io::Error::new( diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 478bbd7b6cb..649d1b026c2 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -315,32 +315,6 @@ pub fn open_segment_reader( Reader::new(max_log_format_version, offset, storage) } -/// Open the newest, non-empty segment in `repo`. -/// -/// A segment is considered non-empty if its size is greater than -/// [segment::Header::LEN]. -/// -/// Returns the segment offset alongside the raw [Repo::SegmentReader], -/// or `None` if the repo is empty (i.e. contains no non-empty segment). -pub(crate) fn open_newest_non_empty_segment(repo: &R) -> io::Result> { - let mut segments = repo.existing_offsets()?; - - let mut newest; - let mut reader; - loop { - let Some(last) = segments.pop() else { - return Ok(None); - }; - newest = last; - reader = repo.open_segment_reader(newest)?; - if reader.segment_len()? > segment::Header::LEN as u64 { - break; - } - } - - Ok(Some((newest, reader))) -} - /// Allocate [Options::max_segment_size] of space for [FileLike] /// if the `fallocate` feature is enabled, /// and [Options::preallocate_segments] is `true`. From bf8105b07fbd118d9b0aa6db861ce22f4570c4ab Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 23 Feb 2026 12:35:12 +0100 Subject: [PATCH 4/4] Unused import --- crates/commitlog/src/repo/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 649d1b026c2..57d4a9e9f92 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -6,7 +6,7 @@ use crate::{ commit::Commit, error, index::{IndexFile, IndexFileMut}, - segment::{self, FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer}, + segment::{FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer}, Options, };