From e623377558cb7519dcda267a88eac81b18a6e281 Mon Sep 17 00:00:00 2001 From: James Ross Date: Sun, 21 Jun 2026 15:21:25 -0700 Subject: [PATCH] feat(warp-core): add WAL projection fact types --- CHANGELOG.md | 5 + crates/warp-core/src/causal_wal.rs | 356 +++++++++++++++++++++ crates/warp-core/tests/causal_wal_tests.rs | 142 +++++++- docs/BEARING.md | 10 + 4 files changed, 507 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aaf76bb7..4f9d21c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ ### Added +- `warp-core` now exposes WAL projection fact records for `WalRoot`, + `WalWriterEpoch`, `WalSegmentRef`, `WalCommitAnchor`, and + `RecoveryCertificateRef`; `WalSegmentRef::identity_digest()` binds writer + epoch, LSN range, commit chain, segment digest, commit anchors, and seal + posture while excluding storage locators from causal projection identity. - `cargo xtask test-slice durable-runtime-wal` now runs the release-grade filesystem runtime WAL durability gate, joining filesystem ACK recovery, filesystem failure atomicity, CLI submission posture JSON, stale-claim, and diff --git a/crates/warp-core/src/causal_wal.rs b/crates/warp-core/src/causal_wal.rs index 9b7ec690..0caa9cb4 100644 --- a/crates/warp-core/src/causal_wal.rs +++ b/crates/warp-core/src/causal_wal.rs @@ -31,6 +31,12 @@ const WAL_RECOVERED_INDEX_ROOT_DOMAIN: &[u8] = b"echo:causal_wal:recovered_index const WAL_HEADER_CHECKSUM_DOMAIN: &[u8] = b"echo:causal_wal:header_checksum:v1\0"; const WAL_FRAME_CHECKSUM_DOMAIN: &[u8] = b"echo:causal_wal:frame_checksum:v1\0"; const WAL_DISK_RECORD_DOMAIN: &[u8] = b"echo:causal_wal:disk_record:v1\0"; +const WAL_PROJECTION_ROOT_DOMAIN: &[u8] = b"echo:causal_wal:projection:root:v1\0"; +const WAL_PROJECTION_WRITER_EPOCH_DOMAIN: &[u8] = b"echo:causal_wal:projection:writer_epoch:v1\0"; +const WAL_PROJECTION_SEGMENT_DOMAIN: &[u8] = b"echo:causal_wal:projection:segment:v1\0"; +const WAL_PROJECTION_COMMIT_ANCHOR_DOMAIN: &[u8] = b"echo:causal_wal:projection:commit_anchor:v1\0"; +const WAL_PROJECTION_RECOVERY_CERTIFICATE_DOMAIN: &[u8] = + b"echo:causal_wal:projection:recovery_certificate:v1\0"; const CHECKPOINT_FILE_MAGIC: &[u8; 8] = b"ECWALCP1"; const WAL_SEGMENT_RECORD_MAGIC: &[u8; 8] = b"ECWALR1!"; const WAL_SEGMENTS_DIR: &str = "segments"; @@ -2597,6 +2603,298 @@ pub struct RecoveryCertificate { pub recovered_indexes_root: Hash, } +/// Read-model projection of a WAL root. +/// +/// This record is evidence about committed WAL history. It is not a +/// [`WalStorePort`], does not grant storage authority, and intentionally carries +/// only typed references suitable for graph projection. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct WalRoot { + /// Projection root digest. + pub root_digest: Hash, + /// Writer epochs covered by this projection root. + pub writer_epochs: Vec, + /// Segment references covered by this projection root. + pub segments: Vec, + /// Recovery certificate reference, if recovery produced one. + pub recovery_certificate: Option, +} + +impl WalRoot { + /// Computes a stable read-model identity digest for the projected WAL root. + /// + /// Segment storage locators are excluded from this digest by delegating to + /// [`WalSegmentRef::identity_digest`]. + #[must_use] + pub fn identity_digest(&self) -> Hash { + let mut h = blake3::Hasher::new(); + h.update(WAL_PROJECTION_ROOT_DOMAIN); + h.update(&self.root_digest); + let mut writer_epoch_digests = self + .writer_epochs + .iter() + .map(WalWriterEpoch::identity_digest) + .collect::>(); + writer_epoch_digests.sort_unstable(); + h.update(&len_u64(writer_epoch_digests.len()).to_le_bytes()); + for writer_epoch_digest in &writer_epoch_digests { + h.update(writer_epoch_digest); + } + + let mut segment_digests = self + .segments + .iter() + .map(WalSegmentRef::identity_digest) + .collect::>(); + segment_digests.sort_unstable(); + h.update(&len_u64(segment_digests.len()).to_le_bytes()); + for segment_digest in &segment_digests { + h.update(segment_digest); + } + update_optional_projection_digest( + &mut h, + self.recovery_certificate + .as_ref() + .map(RecoveryCertificateRef::identity_digest), + ); + h.finalize().into() + } +} + +/// Read-model projection of WAL writer epoch metadata. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct WalWriterEpoch { + /// Epoch id. + pub epoch_id: WriterEpochId, + /// Storage fencing token or lease token. + pub storage_fencing_token: Hash, + /// Process identity evidence. + pub process_identity: Hash, + /// Host identity evidence. + pub host_identity: Hash, + /// First LSN owned by the epoch. + pub started_at_lsn: Lsn, + /// Previous epoch id, if any. + pub previous_epoch_id: Option, + /// Previous epoch final commit digest, if any. + pub previous_epoch_final_commit_digest: Option, + /// Lease or lock evidence. + pub lease_or_lock_evidence: Hash, +} + +impl WalWriterEpoch { + /// Builds a projection record from writer-epoch evidence. + #[must_use] + pub fn from_writer_epoch(epoch: &WriterEpoch) -> Self { + Self { + epoch_id: epoch.epoch_id, + storage_fencing_token: epoch.storage_fencing_token, + process_identity: epoch.process_identity, + host_identity: epoch.host_identity, + started_at_lsn: epoch.started_at_lsn, + previous_epoch_id: epoch.previous_epoch_id, + previous_epoch_final_commit_digest: epoch.previous_epoch_final_commit_digest, + lease_or_lock_evidence: epoch.lease_or_lock_evidence, + } + } + + /// Computes a stable read-model identity digest for this writer epoch. + #[must_use] + pub fn identity_digest(&self) -> Hash { + let mut h = blake3::Hasher::new(); + h.update(WAL_PROJECTION_WRITER_EPOCH_DOMAIN); + h.update(&self.epoch_id.as_hash()); + h.update(&self.storage_fencing_token); + h.update(&self.process_identity); + h.update(&self.host_identity); + h.update(&self.started_at_lsn.as_u64().to_le_bytes()); + update_optional_writer_epoch_id(&mut h, self.previous_epoch_id); + update_optional_projection_digest(&mut h, self.previous_epoch_final_commit_digest); + h.update(&self.lease_or_lock_evidence); + h.finalize().into() + } +} + +/// Commit anchor retained by a WAL segment projection. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct WalCommitAnchor { + /// Transaction id covered by the anchor. + pub transaction_id: WalTransactionId, + /// Commit digest covered by the anchor. + pub commit_digest: Hash, + /// First LSN covered by the committed transaction. + pub first_lsn: Lsn, + /// Last LSN covered by the committed transaction. + pub last_lsn: Lsn, + /// Number of records covered by the committed transaction. + pub record_count: u64, +} + +impl WalCommitAnchor { + /// Builds a commit anchor from a committed transaction marker. + #[must_use] + pub fn from_commit(commit: &WalTransactionCommit) -> Self { + Self { + transaction_id: commit.transaction_id, + commit_digest: commit.commit_digest, + first_lsn: commit.first_lsn, + last_lsn: commit.last_lsn, + record_count: commit.record_count, + } + } + + /// Computes a stable identity digest for this commit anchor. + #[must_use] + pub fn identity_digest(&self) -> Hash { + let mut h = blake3::Hasher::new(); + h.update(WAL_PROJECTION_COMMIT_ANCHOR_DOMAIN); + h.update(&self.transaction_id.as_hash()); + h.update(&self.commit_digest); + h.update(&self.first_lsn.as_u64().to_le_bytes()); + h.update(&self.last_lsn.as_u64().to_le_bytes()); + h.update(&self.record_count.to_le_bytes()); + h.finalize().into() + } +} + +/// WAL segment sealing posture projected into the read model. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum WalSegmentSealPosture { + /// Segment remains open and must not be treated as sealed evidence. + Open, + /// Segment was sealed through WAL store evidence. + Sealed { + /// Last LSN included in the sealed segment, if any. + sealed_lsn: Option, + }, +} + +impl WalSegmentSealPosture { + fn code(&self) -> u8 { + match self { + Self::Open => 0, + Self::Sealed { .. } => 1, + } + } +} + +/// Storage locator metadata for a projected WAL segment. +/// +/// Locators are transport and operator metadata. They are intentionally excluded +/// from [`WalSegmentRef::identity_digest`] so moving a segment between absolute +/// filesystem roots cannot alter causal projection identity. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum WalSegmentStorageLocator { + /// Repository- or root-relative locator. + RelativePath(PathBuf), + /// Absolute local filesystem locator. + AbsolutePath(PathBuf), +} + +/// Read-model projection reference to one WAL segment. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct WalSegmentRef { + /// Writer epoch that owns this segment. + pub writer_epoch: WriterEpochId, + /// Logical WAL segment id. + pub segment_id: WalSegmentId, + /// First LSN covered by the segment. + pub first_lsn: Lsn, + /// Last LSN covered by the segment. + pub last_lsn: Lsn, + /// Commit digest preceding the segment's committed chain. + pub previous_commit_digest: Hash, + /// Final commit digest observed for the segment. + pub final_commit_digest: Hash, + /// Digest of the segment contents. + pub segment_digest: Hash, + /// Commit anchors covered by the segment. + pub commit_anchors: Vec, + /// Segment sealing posture. + pub seal_posture: WalSegmentSealPosture, + /// Optional storage locator metadata, excluded from identity. + pub storage_locator: Option, +} + +impl WalSegmentRef { + /// Computes a stable read-model identity digest for the segment reference. + /// + /// The digest includes writer epoch, LSN range, commit-digest chain, segment + /// digest, commit anchors, and sealing posture. It deliberately excludes + /// [`Self::storage_locator`]. + #[must_use] + pub fn identity_digest(&self) -> Hash { + let mut h = blake3::Hasher::new(); + h.update(WAL_PROJECTION_SEGMENT_DOMAIN); + h.update(&self.writer_epoch.as_hash()); + h.update(&self.segment_id.as_u64().to_le_bytes()); + h.update(&self.first_lsn.as_u64().to_le_bytes()); + h.update(&self.last_lsn.as_u64().to_le_bytes()); + h.update(&self.previous_commit_digest); + h.update(&self.final_commit_digest); + h.update(&self.segment_digest); + + let mut anchors = self.commit_anchors.clone(); + anchors.sort_by_key(|anchor| { + ( + anchor.first_lsn, + anchor.last_lsn, + anchor.transaction_id, + anchor.commit_digest, + ) + }); + h.update(&len_u64(anchors.len()).to_le_bytes()); + for anchor in &anchors { + h.update(&anchor.identity_digest()); + } + + h.update(&[self.seal_posture.code()]); + match &self.seal_posture { + WalSegmentSealPosture::Open => {} + WalSegmentSealPosture::Sealed { sealed_lsn } => { + update_optional_lsn(&mut h, *sealed_lsn); + } + } + h.finalize().into() + } +} + +/// Read-model projection reference to a recovery certificate. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RecoveryCertificateRef { + /// Recovery certificate digest. + pub certificate_digest: Hash, + /// Checkpoint digest used as replay base, if any. + pub checkpoint_used: Option, + /// First scanned committed LSN. + pub first_lsn: Option, + /// Last scanned committed LSN. + pub last_lsn: Option, + /// Tail posture observed during recovery. + pub tail_posture: RecoveryTailPosture, + /// Final frontier root. + pub recovered_frontier_root: Hash, + /// Final index root. + pub recovered_indexes_root: Hash, +} + +impl RecoveryCertificateRef { + /// Computes a stable identity digest for this recovery-certificate reference. + #[must_use] + pub fn identity_digest(&self) -> Hash { + let mut h = blake3::Hasher::new(); + h.update(WAL_PROJECTION_RECOVERY_CERTIFICATE_DOMAIN); + h.update(&self.certificate_digest); + update_optional_projection_digest(&mut h, self.checkpoint_used); + update_optional_lsn(&mut h, self.first_lsn); + update_optional_lsn(&mut h, self.last_lsn); + update_recovery_tail_posture(&mut h, self.tail_posture); + h.update(&self.recovered_frontier_root); + h.update(&self.recovered_indexes_root); + h.finalize().into() + } +} + /// Read-only WAL doctor posture. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum WalDoctorPosture { @@ -5195,6 +5493,64 @@ fn update_len_prefixed(hasher: &mut blake3::Hasher, bytes: &[u8]) { hasher.update(bytes); } +fn update_optional_projection_digest(hasher: &mut blake3::Hasher, digest: Option) { + match digest { + Some(digest) => { + hasher.update(&[1]); + hasher.update(&digest); + } + None => { + hasher.update(&[0]); + } + } +} + +fn update_optional_writer_epoch_id(hasher: &mut blake3::Hasher, epoch_id: Option) { + match epoch_id { + Some(epoch_id) => { + hasher.update(&[1]); + hasher.update(&epoch_id.as_hash()); + } + None => { + hasher.update(&[0]); + } + } +} + +fn update_optional_lsn(hasher: &mut blake3::Hasher, lsn: Option) { + match lsn { + Some(lsn) => { + hasher.update(&[1]); + hasher.update(&lsn.as_u64().to_le_bytes()); + } + None => { + hasher.update(&[0]); + } + } +} + +fn update_recovery_tail_posture(hasher: &mut blake3::Hasher, posture: RecoveryTailPosture) { + match posture { + RecoveryTailPosture::Clean => { + hasher.update(&[0]); + } + RecoveryTailPosture::TruncatedAll => { + hasher.update(&[1]); + } + RecoveryTailPosture::TruncatedAfter(lsn) => { + hasher.update(&[2]); + hasher.update(&lsn.as_u64().to_le_bytes()); + } + RecoveryTailPosture::WouldTruncateAll => { + hasher.update(&[3]); + } + RecoveryTailPosture::WouldTruncateAfter(lsn) => { + hasher.update(&[4]); + hasher.update(&lsn.as_u64().to_le_bytes()); + } + } +} + fn push_hash(out: &mut Vec, hash: &Hash) { out.extend_from_slice(hash); } diff --git a/crates/warp-core/tests/causal_wal_tests.rs b/crates/warp-core/tests/causal_wal_tests.rs index d6ce4e38..1a86be7d 100644 --- a/crates/warp-core/tests/causal_wal_tests.rs +++ b/crates/warp-core/tests/causal_wal_tests.rs @@ -26,12 +26,14 @@ use warp_core::causal_wal::{ MaterializationObservationRecord, MaterializationReplayPosture, MissingMaterialScope, ObjectStoreCapabilityError, ObjectStoreReadAfterWritePosture, ObjectStoreWalCapabilities, PayloadCodecId, PayloadSchemaId, ReadingRefRecord, RecoveredState, RecoveredSubmissionPosture, - RecoveryAccessMode, RecoveryTailPosture, RetainedMaterialKind, RetainedMaterialRecord, - SubmissionAcceptanceRecord, SubmissionRetryPosture, TickReceiptRecord, TransactionLocalIndex, - WalAppendAuthority, WalBuildError, WalCommittedTransaction, WalDoctorPosture, - WalDurabilityMode, WalManifest, WalReceiptCorrelationRecord, WalRecordKind, - WalReleaseReadinessGates, WalSchemaLintError, WalSegmentId, WalStoreError, WalStorePort, - WalTickDecision, WalTransactionBuilder, WalTransactionId, WalTransactionKind, WriterEpochId, + RecoveryAccessMode, RecoveryCertificateRef, RecoveryTailPosture, RetainedMaterialKind, + RetainedMaterialRecord, SubmissionAcceptanceRecord, SubmissionRetryPosture, TickReceiptRecord, + TransactionLocalIndex, WalAppendAuthority, WalBuildError, WalCommitAnchor, + WalCommittedTransaction, WalDoctorPosture, WalDurabilityMode, WalManifest, + WalReceiptCorrelationRecord, WalRecordKind, WalReleaseReadinessGates, WalRoot, + WalSchemaLintError, WalSegmentId, WalSegmentRef, WalSegmentSealPosture, + WalSegmentStorageLocator, WalStoreError, WalStorePort, WalTickDecision, WalTransactionBuilder, + WalTransactionId, WalTransactionKind, WalWriterEpoch, WriterEpoch, WriterEpochId, WriterEpochRequest, }; use warp_core::Hash; @@ -117,6 +119,134 @@ fn writer_epoch_request() -> WriterEpochRequest { } } +#[test] +fn wal_projection_fact_identity_excludes_absolute_storage_locators() { + let writer_epoch = WalWriterEpoch::from_writer_epoch(&WriterEpoch { + epoch_id: epoch_id(), + storage_fencing_token: digest("projection:fencing"), + process_identity: digest("projection:process"), + host_identity: digest("projection:host"), + started_at_lsn: Lsn::from_raw(7), + previous_epoch_id: Some(WriterEpochId::from_hash(digest( + "projection:previous-epoch", + ))), + previous_epoch_final_commit_digest: Some(digest("projection:previous-final-commit")), + lease_or_lock_evidence: digest("projection:lease"), + }); + let commit_anchor = WalCommitAnchor { + transaction_id: transaction_id("projection:tx"), + commit_digest: digest("projection:commit"), + first_lsn: Lsn::from_raw(7), + last_lsn: Lsn::from_raw(9), + record_count: 3, + }; + let relative_locator = WalSegmentStorageLocator::RelativePath(PathBuf::from("segments/0001")); + let absolute_locator = + WalSegmentStorageLocator::AbsolutePath(PathBuf::from("/var/tmp/echo/wal/segments/0001")); + let segment = WalSegmentRef { + writer_epoch: writer_epoch.epoch_id, + segment_id: WalSegmentId::from_raw(1), + first_lsn: Lsn::from_raw(7), + last_lsn: Lsn::from_raw(9), + previous_commit_digest: digest("projection:previous-commit"), + final_commit_digest: digest("projection:commit"), + segment_digest: digest("projection:segment"), + commit_anchors: vec![commit_anchor.clone()], + seal_posture: WalSegmentSealPosture::Sealed { + sealed_lsn: Some(Lsn::from_raw(9)), + }, + storage_locator: Some(relative_locator), + }; + let relocated_segment = WalSegmentRef { + storage_locator: Some(absolute_locator), + ..segment.clone() + }; + + assert_eq!( + segment.identity_digest(), + relocated_segment.identity_digest() + ); + assert_ne!(segment.storage_locator, relocated_segment.storage_locator); + + let changed_segment_digest = WalSegmentRef { + segment_digest: digest("projection:other-segment"), + ..segment.clone() + }; + assert_ne!( + segment.identity_digest(), + changed_segment_digest.identity_digest() + ); + + let changed_anchor = WalSegmentRef { + commit_anchors: vec![WalCommitAnchor { + commit_digest: digest("projection:other-commit"), + ..commit_anchor + }], + ..segment.clone() + }; + assert_ne!(segment.identity_digest(), changed_anchor.identity_digest()); + + let changed_seal_posture = WalSegmentRef { + seal_posture: WalSegmentSealPosture::Open, + ..segment.clone() + }; + assert_ne!( + segment.identity_digest(), + changed_seal_posture.identity_digest() + ); + + let recovery = RecoveryCertificateRef { + certificate_digest: digest("projection:certificate"), + checkpoint_used: Some(digest("projection:checkpoint")), + first_lsn: Some(Lsn::from_raw(7)), + last_lsn: Some(Lsn::from_raw(9)), + tail_posture: RecoveryTailPosture::Clean, + recovered_frontier_root: digest("projection:frontier"), + recovered_indexes_root: digest("projection:indexes"), + }; + let second_anchor = WalCommitAnchor { + transaction_id: transaction_id("projection:tx:second"), + commit_digest: digest("projection:second-commit"), + first_lsn: Lsn::from_raw(10), + last_lsn: Lsn::from_raw(12), + record_count: 3, + }; + let second_segment = WalSegmentRef { + writer_epoch: writer_epoch.epoch_id, + segment_id: WalSegmentId::from_raw(2), + first_lsn: Lsn::from_raw(10), + last_lsn: Lsn::from_raw(12), + previous_commit_digest: digest("projection:commit"), + final_commit_digest: digest("projection:second-commit"), + segment_digest: digest("projection:second-segment"), + commit_anchors: vec![second_anchor], + seal_posture: WalSegmentSealPosture::Sealed { + sealed_lsn: Some(Lsn::from_raw(12)), + }, + storage_locator: Some(WalSegmentStorageLocator::RelativePath(PathBuf::from( + "segments/0002", + ))), + }; + let root = WalRoot { + root_digest: digest("projection:root"), + writer_epochs: vec![writer_epoch.clone()], + segments: vec![relocated_segment, second_segment.clone()], + recovery_certificate: Some(recovery.clone()), + }; + let reordered_root = WalRoot { + root_digest: digest("projection:root"), + writer_epochs: vec![writer_epoch], + segments: vec![second_segment, segment.clone()], + recovery_certificate: Some(recovery), + }; + + assert_eq!( + root.segments[0].identity_digest(), + segment.identity_digest() + ); + assert_eq!(root.identity_digest(), reordered_root.identity_digest()); +} + fn builder( transaction_id: WalTransactionId, first_lsn: Lsn, diff --git a/docs/BEARING.md b/docs/BEARING.md index 9be40ece..0b774ecd 100644 --- a/docs/BEARING.md +++ b/docs/BEARING.md @@ -1066,6 +1066,16 @@ completed slice. - Test plan: joined runtime WAL durability witness: `cargo xtask test-slice durable-runtime-wal`. +- [x] **Slice 97: WAL projection fact types** + - User story: As recovery and projection, WAL roots, writer epochs, + segments, commit anchors, and recovery certificates need typed read-model + evidence before WSC export/import work can depend on them. + - Acceptance criteria: `WalSegmentRef` identity binds writer epoch, LSN + range, commit digest chain, segment digest, commit anchors, and seal + posture while excluding raw storage locators from causal identity. + - Test plan: WAL projection identity witness: + `cargo test -p warp-core wal_projection_fact_identity`. + ## Recently Completed Slice Batch 1. **Contract-Aware Receipts And Readings**