diff --git a/crates/commitlog/src/commit.rs b/crates/commitlog/src/commit.rs index fa42e44a109..a422f4b1e3c 100644 --- a/crates/commitlog/src/commit.rs +++ b/crates/commitlog/src/commit.rs @@ -342,30 +342,39 @@ impl StoredCommit { } } -/// Numbers needed to compute [`crate::segment::Header`]. +/// A [`StoredCommit`] sans the records payload. #[derive(Clone, Debug, Eq, PartialEq)] pub struct Metadata { pub tx_range: Range, pub size_in_bytes: u64, pub epoch: u64, + pub checksum: u32, } impl Metadata { - /// Extract the [`Metadata`] of a single [`Commit`] from the given reader. + /// Extract the [`Metadata`] of a single [`StoredCommit`] from the given + /// reader. /// /// Note that this decodes the commit due to checksum verification. - /// Like [`Commit::decode`], returns `None` if the reader is at EOF already. + /// Like [`StoredCommit::decode`], this method returns `None` if the reader + /// is at EOF already. pub fn extract(reader: R) -> io::Result> { - Commit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from)) + StoredCommit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from)) } } -impl From for Metadata { - fn from(commit: Commit) -> Self { +impl From for Metadata { + fn from(commit: StoredCommit) -> Self { + let tx_range = commit.tx_range(); + let epoch = commit.epoch; + let checksum = commit.checksum; + let size_in_bytes = Commit::from(commit).encoded_len() as u64; + Self { - tx_range: commit.tx_range(), - size_in_bytes: commit.encoded_len() as u64, - epoch: commit.epoch, + tx_range, + size_in_bytes, + epoch, + checksum, } } } diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03c590e4950..5e7d252244a 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -15,7 +15,7 @@ use crate::{ error::{self, source_chain}, index::IndexError, payload::Decoder, - repo::{self, Repo, TxOffsetIndex}, + repo::{self, Repo, SegmentLen as _, TxOffsetIndex}, segment::{self, FileLike, Transaction, Writer}, Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION, }; @@ -355,7 +355,142 @@ impl Drop for Generic { } } -/// Extract the most recently written [`segment::Metadata`] from the commitlog +/// The most recent non empty segment in repo `R`. +/// +/// Created by [open_newest_non_empty_segment]. +struct MostRecentNonEmptySegment { + /// Number of empty segments that were ignored. + empty_segments: usize, + /// Offset of the non-empty segment. + segment_offset: u64, + /// [Repo::SegmentReader] for the non-empty segment. + segment_reader: R, +} + +/// Open the most recent segment in `repo` that is larger than +/// [segment::Header::LEN]. +/// +/// Note that there should be at most one empty segment in the log. We may, +/// however, want to be lenient on this read-only path, so the number of +/// empty segments is tracked in the returned type rather than returning an +/// error. +fn open_newest_non_empty_segment(repo: R) -> io::Result>> { + let mut segments = repo.existing_offsets()?; + + let mut empty_segments = 0; + let mut segment_offset; + let mut segment_reader; + loop { + let Some(last) = segments.pop() else { + return Ok(None); + }; + segment_offset = last; + segment_reader = repo.open_segment_reader(segment_offset)?; + if segment_reader.segment_len()? > segment::Header::LEN as u64 { + break; + } else { + empty_segments += 1; + } + } + + Ok(Some(MostRecentNonEmptySegment { + empty_segments, + segment_offset, + segment_reader, + })) +} + +/// The most recently written [segment::Metadata] for a given [Repo]. +/// +/// The type preserves the error information in case the most recent segment +/// contains corrupted data at the end (typically due to a torn write). +/// +/// Created by [committed_meta]. +pub enum CommittedMeta { + /// The most recent segment could not be traversed successfully until the + /// end, i.e. there is trailing garbage in the segment. + /// + /// This variant is also returned in case [open_newest_non_empty_segment] + /// finds more than a single empty segment at the end of the log. + Prefix { + /// The metadata of the prefix that could be traversed successfully. + /// + /// It is guaranteed that the metadata spans at least one commit. + metadata: segment::Metadata, + /// The error encountered. + error: io::Error, + }, + /// The most recent segment could be traversed successfully until the end. + Complete { + /// The segment metadata. + /// + /// It is guaranteed that the metadata spans at least one commit. + metadata: segment::Metadata, + }, +} + +impl CommittedMeta { + pub fn metadata(&self) -> &segment::Metadata { + let (Self::Prefix { metadata, .. } | Self::Complete { metadata }) = self; + metadata + } + + fn extract(repo: impl Repo) -> io::Result> { + let Some(MostRecentNonEmptySegment { + empty_segments, + segment_offset, + mut segment_reader, + }) = open_newest_non_empty_segment(&repo)? + else { + return Ok(None); + }; + let offset_index = repo.get_offset_index(segment_offset).ok(); + match segment::Metadata::extract(segment_offset, &mut segment_reader, offset_index.as_ref()) { + // Segment is intact. + Ok(metadata) if empty_segments <= 1 => { + assert!( + !metadata.tx_range.is_empty(), + "segment was promised to be non-empty but contains zero transactions" + ); + Ok(Some(CommittedMeta::Complete { metadata })) + } + // Segment is good, but there are too many empty segments. + Ok(metadata) => Ok(Some(CommittedMeta::Prefix { + metadata, + error: io::Error::new( + io::ErrorKind::InvalidData, + format!("repo {}: too many empty segments: {}", repo, empty_segments), + ), + })), + // Segment is non-empty, but first commit is corrupt. + Err(error::SegmentMetadata::InvalidCommit { sofar, source }) if sofar.tx_range.is_empty() => { + Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "repo {}: first commit in the most recent segment is corrupt: {}", + repo, source + ), + )) + } + // Some prefix of the segment is good. + Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => Ok(Some(CommittedMeta::Prefix { + metadata: sofar, + error: source, + })), + // Something went wrong, including out-of-order errors and such. + Err(error::SegmentMetadata::Io(e)) => Err(e), + } + } +} + +impl From for segment::Metadata { + fn from(meta: CommittedMeta) -> Self { + let (CommittedMeta::Prefix { metadata, .. } | CommittedMeta::Complete { metadata }) = meta; + metadata + } +} + +/// Extract the most recently written [CommittedMeta] from the commitlog /// in `repo`. /// /// Returns `None` if the commitlog is empty. @@ -373,18 +508,12 @@ impl Drop for Generic { /// like so: /// /// ```ignore -/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end); +/// let max_offset = committed_meta(..)?.map(|meta| meta.metadata().tx_range.end); /// ``` /// /// Unlike `open`, no segment will be created in an empty `repo`. -pub fn committed_meta(repo: impl Repo) -> Result, error::SegmentMetadata> { - let Some(last) = repo.existing_offsets()?.pop() else { - return Ok(None); - }; - - let mut storage = repo.open_segment_reader(last)?; - let offset_index = repo.get_offset_index(last).ok(); - segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some) +pub fn committed_meta(repo: impl Repo) -> io::Result> { + CommittedMeta::extract(repo) } pub fn commits_from(repo: R, max_log_format_version: u8, offset: u64) -> io::Result> { diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 1e7a8c0047e..a5dc3d09c26 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -19,6 +19,7 @@ mod varint; pub use crate::{ commit::{Commit, StoredCommit}, + commitlog::CommittedMeta, payload::{Decoder, Encode}, repo::fs::SizeOnDisk, segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION}, @@ -558,7 +559,7 @@ impl Commitlog { /// ``` /// /// Unlike `open`, no segment will be created in an empty `repo`. -pub fn committed_meta(root: CommitLogDir) -> Result, error::SegmentMetadata> { +pub fn committed_meta(root: CommitLogDir) -> io::Result> { commitlog::committed_meta(repo::Fs::new(root, None)?) } diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 5be633bf5c7..57d4a9e9f92 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -258,6 +258,7 @@ pub fn resume_segment_writer( size_in_bytes, max_epoch, max_commit_offset: _, + max_commit: _, } = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) { Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => { warn!("invalid commit in segment {offset}: {source}"); diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 7e8c054467b..7598f05e1c0 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -594,6 +594,7 @@ pub struct Metadata { /// `max_commit_offset..tx_range.end` is the range of /// transactions contained in it. pub max_commit_offset: u64, + pub max_commit: Option, } impl Metadata { @@ -627,6 +628,7 @@ impl Metadata { size_in_bytes: Header::LEN as u64, max_epoch: u64::default(), max_commit_offset: min_tx_offset, + max_commit: None, }); reader.seek(SeekFrom::Start(sofar.size_in_bytes))?; @@ -663,6 +665,7 @@ impl Metadata { // TODO: Should it be an error to encounter an epoch going backwards? sofar.max_epoch = commit.epoch.max(sofar.max_epoch); sofar.max_commit_offset = commit.tx_range.start; + sofar.max_commit = Some(commit); } Ok(sofar) @@ -695,6 +698,7 @@ impl Metadata { size_in_bytes: byte_offset + commit.size_in_bytes, max_epoch: commit.epoch, max_commit_offset: commit.tx_range.start, + max_commit: Some(commit), }); } @@ -833,18 +837,33 @@ mod tests { writer.commit().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); - let metadata = reader.metadata().unwrap(); + let Metadata { + header, + tx_range, + size_in_bytes, + max_epoch, + max_commit_offset, + max_commit, + } = reader.metadata().unwrap(); assert_eq!( - metadata, - Metadata { - header: Header::default(), - tx_range: Range { start: 0, end: 5 }, + ( + header, + tx_range, + size_in_bytes, + max_epoch, + max_commit_offset, + max_commit.is_some_and(|meta| meta.tx_range == (3..5)) + ), + ( + Header::default(), + 0..5, // header + 5 txs + 3 commits - size_in_bytes: (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64, - max_epoch: Commit::DEFAULT_EPOCH, - max_commit_offset: 3 - } + (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64, + Commit::DEFAULT_EPOCH, + 3, + true + ) ); }