Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

### Added

- `warp-core` now exposes WAL projection fact records for `WalRoot`,
`WalWriterEpoch`, `WalSegmentRef`, `WalCommitAnchor`, and
`RecoveryCertificateRef`; `WalSegmentRef::identity_digest()` binds writer
epoch, LSN range, commit chain, segment digest, commit anchors, and seal
posture while excluding storage locators from causal projection identity.
- `cargo xtask test-slice durable-runtime-wal` now runs the release-grade
filesystem runtime WAL durability gate, joining filesystem ACK recovery,
filesystem failure atomicity, CLI submission posture JSON, stale-claim, and
Expand Down
356 changes: 356 additions & 0 deletions crates/warp-core/src/causal_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ const WAL_RECOVERED_INDEX_ROOT_DOMAIN: &[u8] = b"echo:causal_wal:recovered_index
const WAL_HEADER_CHECKSUM_DOMAIN: &[u8] = b"echo:causal_wal:header_checksum:v1\0";
const WAL_FRAME_CHECKSUM_DOMAIN: &[u8] = b"echo:causal_wal:frame_checksum:v1\0";
const WAL_DISK_RECORD_DOMAIN: &[u8] = b"echo:causal_wal:disk_record:v1\0";
const WAL_PROJECTION_ROOT_DOMAIN: &[u8] = b"echo:causal_wal:projection:root:v1\0";
const WAL_PROJECTION_WRITER_EPOCH_DOMAIN: &[u8] = b"echo:causal_wal:projection:writer_epoch:v1\0";
const WAL_PROJECTION_SEGMENT_DOMAIN: &[u8] = b"echo:causal_wal:projection:segment:v1\0";
const WAL_PROJECTION_COMMIT_ANCHOR_DOMAIN: &[u8] = b"echo:causal_wal:projection:commit_anchor:v1\0";
const WAL_PROJECTION_RECOVERY_CERTIFICATE_DOMAIN: &[u8] =
b"echo:causal_wal:projection:recovery_certificate:v1\0";
const CHECKPOINT_FILE_MAGIC: &[u8; 8] = b"ECWALCP1";
const WAL_SEGMENT_RECORD_MAGIC: &[u8; 8] = b"ECWALR1!";
const WAL_SEGMENTS_DIR: &str = "segments";
Expand Down Expand Up @@ -2597,6 +2603,298 @@ pub struct RecoveryCertificate {
pub recovered_indexes_root: Hash,
}

/// Read-model projection of a WAL root.
///
/// This record is evidence about committed WAL history. It is not a
/// [`WalStorePort`], does not grant storage authority, and intentionally carries
/// only typed references suitable for graph projection.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WalRoot {
/// Projection root digest.
pub root_digest: Hash,
/// Writer epochs covered by this projection root.
pub writer_epochs: Vec<WalWriterEpoch>,
/// Segment references covered by this projection root.
pub segments: Vec<WalSegmentRef>,
/// Recovery certificate reference, if recovery produced one.
pub recovery_certificate: Option<RecoveryCertificateRef>,
}

impl WalRoot {
/// Computes a stable read-model identity digest for the projected WAL root.
///
/// Segment storage locators are excluded from this digest by delegating to
/// [`WalSegmentRef::identity_digest`].
#[must_use]
pub fn identity_digest(&self) -> Hash {
let mut h = blake3::Hasher::new();
h.update(WAL_PROJECTION_ROOT_DOMAIN);
h.update(&self.root_digest);
let mut writer_epoch_digests = self
.writer_epochs
.iter()
.map(WalWriterEpoch::identity_digest)
.collect::<Vec<_>>();
writer_epoch_digests.sort_unstable();
h.update(&len_u64(writer_epoch_digests.len()).to_le_bytes());
for writer_epoch_digest in &writer_epoch_digests {
h.update(writer_epoch_digest);
}

let mut segment_digests = self
.segments
.iter()
.map(WalSegmentRef::identity_digest)
.collect::<Vec<_>>();
segment_digests.sort_unstable();
h.update(&len_u64(segment_digests.len()).to_le_bytes());
for segment_digest in &segment_digests {
h.update(segment_digest);
}
update_optional_projection_digest(
&mut h,
self.recovery_certificate
.as_ref()
.map(RecoveryCertificateRef::identity_digest),
);
h.finalize().into()
}
}

/// Read-model projection of WAL writer epoch metadata.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WalWriterEpoch {
/// Epoch id.
pub epoch_id: WriterEpochId,
/// Storage fencing token or lease token.
pub storage_fencing_token: Hash,
/// Process identity evidence.
pub process_identity: Hash,
/// Host identity evidence.
pub host_identity: Hash,
/// First LSN owned by the epoch.
pub started_at_lsn: Lsn,
/// Previous epoch id, if any.
pub previous_epoch_id: Option<WriterEpochId>,
/// Previous epoch final commit digest, if any.
pub previous_epoch_final_commit_digest: Option<Hash>,
/// Lease or lock evidence.
pub lease_or_lock_evidence: Hash,
}

impl WalWriterEpoch {
/// Builds a projection record from writer-epoch evidence.
#[must_use]
pub fn from_writer_epoch(epoch: &WriterEpoch) -> Self {
Self {
epoch_id: epoch.epoch_id,
storage_fencing_token: epoch.storage_fencing_token,
process_identity: epoch.process_identity,
host_identity: epoch.host_identity,
started_at_lsn: epoch.started_at_lsn,
previous_epoch_id: epoch.previous_epoch_id,
previous_epoch_final_commit_digest: epoch.previous_epoch_final_commit_digest,
lease_or_lock_evidence: epoch.lease_or_lock_evidence,
}
}

/// Computes a stable read-model identity digest for this writer epoch.
#[must_use]
pub fn identity_digest(&self) -> Hash {
let mut h = blake3::Hasher::new();
h.update(WAL_PROJECTION_WRITER_EPOCH_DOMAIN);
h.update(&self.epoch_id.as_hash());
h.update(&self.storage_fencing_token);
h.update(&self.process_identity);
h.update(&self.host_identity);
h.update(&self.started_at_lsn.as_u64().to_le_bytes());
update_optional_writer_epoch_id(&mut h, self.previous_epoch_id);
update_optional_projection_digest(&mut h, self.previous_epoch_final_commit_digest);
h.update(&self.lease_or_lock_evidence);
h.finalize().into()
}
}

/// Commit anchor retained by a WAL segment projection.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WalCommitAnchor {
/// Transaction id covered by the anchor.
pub transaction_id: WalTransactionId,
/// Commit digest covered by the anchor.
pub commit_digest: Hash,
/// First LSN covered by the committed transaction.
pub first_lsn: Lsn,
/// Last LSN covered by the committed transaction.
pub last_lsn: Lsn,
/// Number of records covered by the committed transaction.
pub record_count: u64,
}

impl WalCommitAnchor {
/// Builds a commit anchor from a committed transaction marker.
#[must_use]
pub fn from_commit(commit: &WalTransactionCommit) -> Self {
Self {
transaction_id: commit.transaction_id,
commit_digest: commit.commit_digest,
first_lsn: commit.first_lsn,
last_lsn: commit.last_lsn,
record_count: commit.record_count,
}
}

/// Computes a stable identity digest for this commit anchor.
#[must_use]
pub fn identity_digest(&self) -> Hash {
let mut h = blake3::Hasher::new();
h.update(WAL_PROJECTION_COMMIT_ANCHOR_DOMAIN);
h.update(&self.transaction_id.as_hash());
h.update(&self.commit_digest);
h.update(&self.first_lsn.as_u64().to_le_bytes());
h.update(&self.last_lsn.as_u64().to_le_bytes());
h.update(&self.record_count.to_le_bytes());
h.finalize().into()
}
}

/// WAL segment sealing posture projected into the read model.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum WalSegmentSealPosture {
/// Segment remains open and must not be treated as sealed evidence.
Open,
/// Segment was sealed through WAL store evidence.
Sealed {
/// Last LSN included in the sealed segment, if any.
sealed_lsn: Option<Lsn>,
},
}

impl WalSegmentSealPosture {
fn code(&self) -> u8 {
match self {
Self::Open => 0,
Self::Sealed { .. } => 1,
}
}
}

/// Storage locator metadata for a projected WAL segment.
///
/// Locators are transport and operator metadata. They are intentionally excluded
/// from [`WalSegmentRef::identity_digest`] so moving a segment between absolute
/// filesystem roots cannot alter causal projection identity.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum WalSegmentStorageLocator {
/// Repository- or root-relative locator.
RelativePath(PathBuf),
/// Absolute local filesystem locator.
AbsolutePath(PathBuf),
}

/// Read-model projection reference to one WAL segment.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WalSegmentRef {
/// Writer epoch that owns this segment.
pub writer_epoch: WriterEpochId,
/// Logical WAL segment id.
pub segment_id: WalSegmentId,
/// First LSN covered by the segment.
pub first_lsn: Lsn,
/// Last LSN covered by the segment.
pub last_lsn: Lsn,
/// Commit digest preceding the segment's committed chain.
pub previous_commit_digest: Hash,
/// Final commit digest observed for the segment.
pub final_commit_digest: Hash,
/// Digest of the segment contents.
pub segment_digest: Hash,
/// Commit anchors covered by the segment.
pub commit_anchors: Vec<WalCommitAnchor>,
/// Segment sealing posture.
pub seal_posture: WalSegmentSealPosture,
/// Optional storage locator metadata, excluded from identity.
pub storage_locator: Option<WalSegmentStorageLocator>,
}

impl WalSegmentRef {
/// Computes a stable read-model identity digest for the segment reference.
///
/// The digest includes writer epoch, LSN range, commit-digest chain, segment
/// digest, commit anchors, and sealing posture. It deliberately excludes
/// [`Self::storage_locator`].
#[must_use]
pub fn identity_digest(&self) -> Hash {
let mut h = blake3::Hasher::new();
h.update(WAL_PROJECTION_SEGMENT_DOMAIN);
h.update(&self.writer_epoch.as_hash());
h.update(&self.segment_id.as_u64().to_le_bytes());
h.update(&self.first_lsn.as_u64().to_le_bytes());
h.update(&self.last_lsn.as_u64().to_le_bytes());
h.update(&self.previous_commit_digest);
h.update(&self.final_commit_digest);
h.update(&self.segment_digest);

let mut anchors = self.commit_anchors.clone();
anchors.sort_by_key(|anchor| {
(
anchor.first_lsn,
anchor.last_lsn,
anchor.transaction_id,
anchor.commit_digest,
)
});
h.update(&len_u64(anchors.len()).to_le_bytes());
for anchor in &anchors {
h.update(&anchor.identity_digest());
}

h.update(&[self.seal_posture.code()]);
match &self.seal_posture {
WalSegmentSealPosture::Open => {}
WalSegmentSealPosture::Sealed { sealed_lsn } => {
update_optional_lsn(&mut h, *sealed_lsn);
}
}
h.finalize().into()
}
}

/// Read-model projection reference to a recovery certificate.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RecoveryCertificateRef {
/// Recovery certificate digest.
pub certificate_digest: Hash,
/// Checkpoint digest used as replay base, if any.
pub checkpoint_used: Option<Hash>,
/// First scanned committed LSN.
pub first_lsn: Option<Lsn>,
/// Last scanned committed LSN.
pub last_lsn: Option<Lsn>,
/// Tail posture observed during recovery.
pub tail_posture: RecoveryTailPosture,
/// Final frontier root.
pub recovered_frontier_root: Hash,
/// Final index root.
pub recovered_indexes_root: Hash,
}

impl RecoveryCertificateRef {
/// Computes a stable identity digest for this recovery-certificate reference.
#[must_use]
pub fn identity_digest(&self) -> Hash {
let mut h = blake3::Hasher::new();
h.update(WAL_PROJECTION_RECOVERY_CERTIFICATE_DOMAIN);
h.update(&self.certificate_digest);
update_optional_projection_digest(&mut h, self.checkpoint_used);
update_optional_lsn(&mut h, self.first_lsn);
update_optional_lsn(&mut h, self.last_lsn);
update_recovery_tail_posture(&mut h, self.tail_posture);
h.update(&self.recovered_frontier_root);
h.update(&self.recovered_indexes_root);
h.finalize().into()
}
}

/// Read-only WAL doctor posture.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WalDoctorPosture {
Expand Down Expand Up @@ -5195,6 +5493,64 @@ fn update_len_prefixed(hasher: &mut blake3::Hasher, bytes: &[u8]) {
hasher.update(bytes);
}

fn update_optional_projection_digest(hasher: &mut blake3::Hasher, digest: Option<Hash>) {
match digest {
Some(digest) => {
hasher.update(&[1]);
hasher.update(&digest);
}
None => {
hasher.update(&[0]);
}
}
}

fn update_optional_writer_epoch_id(hasher: &mut blake3::Hasher, epoch_id: Option<WriterEpochId>) {
match epoch_id {
Some(epoch_id) => {
hasher.update(&[1]);
hasher.update(&epoch_id.as_hash());
}
None => {
hasher.update(&[0]);
}
}
}

fn update_optional_lsn(hasher: &mut blake3::Hasher, lsn: Option<Lsn>) {
match lsn {
Some(lsn) => {
hasher.update(&[1]);
hasher.update(&lsn.as_u64().to_le_bytes());
}
None => {
hasher.update(&[0]);
}
}
}

fn update_recovery_tail_posture(hasher: &mut blake3::Hasher, posture: RecoveryTailPosture) {
match posture {
RecoveryTailPosture::Clean => {
hasher.update(&[0]);
}
RecoveryTailPosture::TruncatedAll => {
hasher.update(&[1]);
}
RecoveryTailPosture::TruncatedAfter(lsn) => {
hasher.update(&[2]);
hasher.update(&lsn.as_u64().to_le_bytes());
}
RecoveryTailPosture::WouldTruncateAll => {
hasher.update(&[3]);
}
RecoveryTailPosture::WouldTruncateAfter(lsn) => {
hasher.update(&[4]);
hasher.update(&lsn.as_u64().to_le_bytes());
}
}
}

fn push_hash(out: &mut Vec<u8>, hash: &Hash) {
out.extend_from_slice(hash);
}
Expand Down
Loading
Loading