diff --git a/distribution/engine.go b/distribution/engine.go index e840a23b..bc414af4 100644 --- a/distribution/engine.go +++ b/distribution/engine.go @@ -179,6 +179,21 @@ func (s RouteHistorySnapshot) OwnerOf(key []byte) (uint64, bool) { return 0, false } +// Current returns the route catalog snapshot at the engine's current +// catalogVersion. Returns (zero, false) when the history ring has +// not been initialised (bare-struct Engine). Used by the M3 +// Composed-1 cross-version-read fence (design doc §4.4) — the gate +// compares the txn's observed-version owner against the current +// owner so a route shift between BeginTxn and Commit is caught +// before it can produce a G1c anomaly across a cross-group +// MoveRange / SplitRange. +func (e *Engine) Current() (RouteHistorySnapshot, bool) { + e.mu.RLock() + defer e.mu.RUnlock() + snap, ok := e.history[e.catalogVersion] + return snap, ok +} + // SnapshotAt returns the route catalog snapshot recorded at version v. // Returns (zero, false) when v is not in the ring — either because v // is in the future (> catalogVersion), or because the FIFO ring has @@ -193,6 +208,26 @@ func (e *Engine) SnapshotAt(v uint64) (RouteHistorySnapshot, bool) { return snap, ok } +// SetHistoryDepthForTest overrides the FIFO ring depth from outside +// the package. Test-only — callers MUST set the depth before the +// Engine is shared with any concurrent reader (no internal +// synchronisation here for the same reason TestEngineSnapshotAt_FIFOEviction +// does the direct field write in-package; this seam exposes the +// equivalent capability to external test packages that need a +// small depth to exercise eviction without overwhelming TLC-style +// bounded scenarios — claude review on PR #894). +// +// Production code must use DefaultRouteHistoryDepth (32) or a +// future operator-exposed config knob; this seam is build-time +// equivalent to direct field access and exists ONLY so tests in +// the kv package can drive eviction-trigger scenarios without +// adding a constructor option just for tests. +func (e *Engine) SetHistoryDepthForTest(depth int) { + e.mu.Lock() + defer e.mu.Unlock() + e.historyDepth = depth +} + // HistoryDepth returns the configured ring depth for diagnostics. func (e *Engine) HistoryDepth() int { e.mu.RLock() diff --git a/kv/fsm.go b/kv/fsm.go index 4f6e43bc..75e15bfe 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -86,16 +86,24 @@ type kvFSM struct { } // RouteHistory is the kv-side interface to the route catalog's -// versioned-snapshot ring. *distribution.Engine satisfies it. -// Defined in the kv package so kvFSM does not have to import a -// concrete type for the field; the M3 verifyComposed1 gate uses -// only SnapshotAt and the returned snapshot's OwnerOf, so the -// interface stays minimal. +// versioned-snapshot ring. *distribution.Engine satisfies it via +// WrapDistributionEngine. Defined in the kv package so kvFSM does +// not have to import a concrete type for the field; the M3 +// verifyComposed1 gate uses only SnapshotAt + Current + the returned +// snapshot's OwnerOf, so the interface stays minimal. type RouteHistory interface { // SnapshotAt returns the route catalog at the given catalog // version. Returns (zero, false) when the version is outside - // the ring (either evicted by depth, or in the future). + // the ring (either evicted by depth, or in the future). The + // M3 gate maps the not-found case to ErrComposed1VersionGCd. SnapshotAt(version uint64) (RouteSnapshot, bool) + // Current returns the route catalog snapshot at the engine's + // current catalog version. Returns (zero, false) when the + // engine has no history (bare-struct case used by some test + // seams). The M3 cross-version fence uses this to compare + // the txn's observed-version owner against the current + // owner — a mismatch is the §3 codex P1 trace. + Current() (RouteSnapshot, bool) } // RouteSnapshot is the historical view of the route catalog at a @@ -199,6 +207,38 @@ var _ raftengine.StateMachine = (*kvFSM)(nil) var ErrUnknownRequestType = errors.New("unknown request type") +// ErrComposed1Violation is returned by verifyComposed1 when the +// transaction's commit cannot proceed on this Raft group because the +// txn's read-set or write-set keys are not owned by this group at +// either the txn's observed catalog version (the spec-level §4.2(a) +// check) or the current catalog version observed by the FSM at apply +// time (the §4.4 cross-version-read fence). Surfaces to the +// coordinator as a retryable error: the M4 coordinator path re-reads +// the route cache, re-routes the txn, and re-issues it once on the +// new owning group. +// +// Wrapped with errors.Wrapf at the call site to carry the +// per-key diagnostic (which key, which observed-version owner, which +// current-version owner) — the caller's retry path uses +// errors.Is(err, ErrComposed1Violation) to match. +var ErrComposed1Violation = errors.New("composed-1: route ownership shifted; retry on new owning group") + +// ErrComposed1VersionGCd is returned by verifyComposed1 when the +// txn's observed catalog version is no longer in the engine's +// retention ring — either because the FIFO ring evicted it (the +// txn lived longer than `routeHistoryDepth` versions worth of +// catalog churn) or because the version was never seen on this +// node. Surfaces to the coordinator as a retryable error: the +// caller's M4 retry path reads the current route cache and +// re-issues the txn with a fresh observedVer. +// +// The not-found ⇒ hard-error semantics (rather than soft-pass) +// matters because a soft-pass would let the gate be bypassed +// exactly in the long-running-txn / high-churn cases where the +// cross-version-read hazard is most likely (design doc §4.3 + +// gemini medium + codex P2 on PR #870). +var ErrComposed1VersionGCd = errors.New("composed-1: observed catalog version evicted from history ring; retry") + type fsmApplyResponse struct { results []error } @@ -493,6 +533,9 @@ func (f *kvFSM) RestoredCutover() uint64 { } func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS uint64) error { + if err := f.verifyComposed1(r); err != nil { + return err + } switch r.Phase { case pb.Phase_PREPARE: return f.handlePrepareRequest(ctx, r) @@ -507,6 +550,88 @@ func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS ui } } +// verifyComposed1 is the M3 apply-time Composed-1 gate per +// docs/design/2026_05_29_proposed_composed1_cross_group_commit_guard.md +// §4.2(a) + §4.4. Runs two checks before the txn's writes land: +// +// (a) Observed-version owner — the txn's read-set was captured +// at routes[observedVer], so every write key must be owned +// by THIS Raft group at that historical version. Matches +// the spec-level Commit precondition in tla/composed/Composed.tla. +// +// (b) Current-version owner — even when (a) passes, a route +// shift between BeginTxn and Commit can leave the write +// landing on the OLD owner while readers at the new +// version route to the NEW owner and miss the write (the +// §3 codex P1 G1c trace). The current-version fence +// refuses the commit when this group no longer owns the +// key, forcing a coordinator retry on the new owner. +// +// Short-circuits cleanly in three legacy / not-applicable cases: +// - FSM was constructed without WithRouteHistory (legacy / test +// seam): routes == nil, return nil. +// - Request carries ObservedRouteVersion == 0 (unpinned — +// pre-M1 caller, or ABORT request that doesn't carry the +// version): return nil. +// - Engine.Current returns (zero, false) — the engine has no +// history (bare-struct test seam): return nil at the (b) check. +// +// Returns ErrComposed1VersionGCd when the observed version is +// outside the ring (M4 retry), and ErrComposed1Violation wrapped +// with per-key context otherwise. +func (f *kvFSM) verifyComposed1(r *pb.Request) error { + if f.routes == nil { + return nil + } + observedVer := r.GetObservedRouteVersion() + if observedVer == 0 { + return nil + } + + // (a) Observed-version check. + observedSnap, ok := f.routes.SnapshotAt(observedVer) + if !ok { + return errors.WithStack(ErrComposed1VersionGCd) + } + if err := f.verifyOwnerFromSnapshot(r.GetMutations(), observedSnap, observedVer, "observed"); err != nil { + return err + } + + // (b) Current-version cross-version-read fence. + currentSnap, ok := f.routes.Current() + if !ok { + // No current snapshot — engine has no history, nothing + // to compare against. Fall through (matches the + // short-circuit posture of an unwired FSM). + return nil + } + return f.verifyOwnerFromSnapshot(r.GetMutations(), currentSnap, currentSnap.Version(), "current") +} + +// verifyOwnerFromSnapshot is the shared per-mutation owner-check +// loop used by verifyComposed1's observed-version and current- +// version passes. `phase` is the diagnostic label ("observed" / +// "current") that ends up in the wrapped error. isTxnInternalKey +// mutations (the TxnMeta marker prefix) are skipped — they are +// always on every shard and have no Composed-1 ownership. +func (f *kvFSM) verifyOwnerFromSnapshot(mutations []*pb.Mutation, snap RouteSnapshot, snapVer uint64, phase string) error { + for _, mut := range mutations { + if mut == nil || len(mut.Key) == 0 { + continue + } + if isTxnInternalKey(mut.Key) { + continue + } + owner, found := snap.OwnerOf(mut.Key) + if !found || owner != f.shardGroupID { + return errors.Wrapf(ErrComposed1Violation, + "%s-version v=%d: key %q owned by group %d (found=%v); this FSM serves group %d", + phase, snapVer, mut.Key, owner, found, f.shardGroupID) + } + } + return nil +} + func (f *kvFSM) validateConflicts(ctx context.Context, muts []*pb.Mutation, startTS uint64) error { seen := make(map[string]struct{}, len(muts)) for _, mut := range muts { diff --git a/kv/fsm_composed1_test.go b/kv/fsm_composed1_test.go new file mode 100644 index 00000000..a323fbad --- /dev/null +++ b/kv/fsm_composed1_test.go @@ -0,0 +1,218 @@ +package kv + +import ( + "context" + "testing" + + "github.com/bootjp/elastickv/distribution" + pb "github.com/bootjp/elastickv/proto" + "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// applyComposed1Snapshot is a small helper that wraps the boilerplate +// of pushing a CatalogSnapshot into an Engine with a single +// key-range → group mapping (the typical M3 test shape). Returns the +// version applied so the caller can pin ObservedRouteVersion on +// crafted pb.Requests. +func applyComposed1Snapshot(t *testing.T, e *distribution.Engine, version uint64, routes []distribution.RouteDescriptor) { + t.Helper() + require.NoError(t, e.ApplySnapshot(distribution.CatalogSnapshot{ + Version: version, + Routes: routes, + })) +} + +// newComposed1FSM constructs a kvFSM wired with the engine + the +// shard group ID the gate compares against. Production wiring lives +// in main.go's buildShardGroups; this helper short-circuits to the +// test-only fixture without spinning up a real Raft group. +// +//nolint:unparam // shardGroupID is currently always 1 in tests but +// the helper keeps it as a parameter so a future test can exercise +// the "wrong-group" case without re-deriving the boilerplate. +func newComposed1FSM(t *testing.T, e *distribution.Engine, shardGroupID uint64) *kvFSM { + t.Helper() + fsmIface := NewKvFSMWithHLC(store.NewMVCCStore(), NewHLC(), + WithRouteHistory(WrapDistributionEngine(e), shardGroupID)) + fsm, ok := fsmIface.(*kvFSM) + require.True(t, ok) + return fsm +} + +// commitTxnRequest builds a single-shard one-phase pb.Request for the +// Composed-1 gate tests. Only the fields the gate consults +// (ObservedRouteVersion + Mutations[].Key) need to be set; the +// downstream phase handlers are NOT exercised here — we only assert +// the gate's behaviour, so a malformed-from-the-handler-view request +// is fine. +func commitTxnRequest(observedVer uint64, keys ...string) *pb.Request { + muts := make([]*pb.Mutation, 0, len(keys)) + for _, k := range keys { + muts = append(muts, &pb.Mutation{Op: pb.Op_PUT, Key: []byte(k), Value: []byte("v")}) + } + return &pb.Request{ + IsTxn: true, + Phase: pb.Phase_NONE, + ObservedRouteVersion: observedVer, + Mutations: muts, + } +} + +// TestVerifyComposed1_StaleObservedVersionWithMovedKeyFails is the M3 +// "Done when" criterion (i) from the design doc: a txn that observed +// catalog version v_obs, where the key was owned by g1, now committing +// on g1 AFTER ApplySnapshot moved the key to g2 at v_obs+1 — the +// observed-version owner check at v_obs+1 (this group is g1 but the +// snapshot at v_obs+1 says the owner is g2) fails closed with +// ErrComposed1Violation. +// +// Wait — re-read the design doc: criterion (i) is "stale +// ObservedRouteVersion with the key moved → ErrComposed1Violation", +// meaning the OBSERVED version's snapshot resolves the key to a +// DIFFERENT group than the FSM's shardGroupID. The classic +// MoveRange scenario from the doc's §3 isn't this — that's +// criterion (iii). Criterion (i) is the spec-level Composed-1 +// straight-up: txn pinned v=N, but routes[N][k] ≠ this group. +func TestVerifyComposed1_StaleObservedVersionWithMovedKeyFails(t *testing.T) { + t.Parallel() + + // At v=1, key "k" is owned by group 2. But this FSM serves + // group 1 — a commit pinned at v=1 with key "k" must fail + // the observed-version owner check. + e := distribution.NewEngine() + applyComposed1Snapshot(t, e, 1, []distribution.RouteDescriptor{ + {RouteID: 100, Start: []byte(""), End: nil, GroupID: 2, State: distribution.RouteStateActive}, + }) + fsm := newComposed1FSM(t, e, 1) // this FSM is for group 1 + + err := fsm.handleTxnRequest(context.Background(), commitTxnRequest(1, "k"), 0) + require.ErrorIs(t, err, ErrComposed1Violation, + "observed-version owner check must reject a commit that lands on a group different from the historical owner") + require.Contains(t, err.Error(), "observed-version", + "the wrapped diagnostic must identify which check fired (observed vs current) so M4 retry can pick the right strategy") +} + +// TestVerifyComposed1_ObservedVersionOlderThanRingFails is the M3 +// "Done when" criterion (ii) from the design doc: a txn that observed +// a catalog version no longer in the ring (because the FIFO +// evicted it) surfaces as ErrComposed1VersionGCd, not +// ErrComposed1Violation. The distinction matters because the M4 +// coordinator retry path may want to treat the two differently +// (the violation is "route shifted, re-route"; the GCd is "version +// evicted, re-read catalog and re-issue"). +func TestVerifyComposed1_ObservedVersionOlderThanRingFails(t *testing.T) { + t.Parallel() + + e := distribution.NewEngine() + // Tiny depth so the eviction trigger is bounded. Safe direct + // write: e is local to this goroutine and the depth is set + // before any ApplySnapshot fires. + e.SetHistoryDepthForTest(2) + for v := uint64(1); v <= 5; v++ { + applyComposed1Snapshot(t, e, v, []distribution.RouteDescriptor{ + {RouteID: 100, Start: []byte(""), End: nil, GroupID: 1, State: distribution.RouteStateActive}, + }) + } + // At depth=2, only versions 4 and 5 are retained; v=2 has been + // evicted long ago. + fsm := newComposed1FSM(t, e, 1) + + err := fsm.handleTxnRequest(context.Background(), commitTxnRequest(2, "k"), 0) + require.ErrorIs(t, err, ErrComposed1VersionGCd, + "a txn observing a version outside the retention ring must surface ErrComposed1VersionGCd so M4 retry can re-read the catalog") +} + +// TestVerifyComposed1_ObservedPassesButCurrentDiffersFails is the M3 +// "Done when" criterion (iii) — the §3 codex P1 trace surfaced on +// PR #870. Step-by-step: +// +// 1. At v=1, key "k1" is owned by g1. Txn pins observedVer=1. +// 2. ApplySnapshot lands v=2 with k1 owned by g2. +// 3. Txn commits on g1 (it routed via its observed catalog). +// 4. Observed-version check at v=1 passes (routes[1][k1] = g1, this FSM +// serves g1). +// 5. Current-version check at v=2 fails (routes[2][k1] = g2, this FSM +// serves g1) — ErrComposed1Violation. +// +// Without the (b) cross-version fence, the commit would land on g1 +// while readers at v=2 route to g2 and miss the write — exactly the +// G1c anomaly Composed-1a in the TLA+ spec (and PR #878) closes. +func TestVerifyComposed1_ObservedPassesButCurrentDiffersFails(t *testing.T) { + t.Parallel() + + e := distribution.NewEngine() + // v=1: k1 owned by g1 + applyComposed1Snapshot(t, e, 1, []distribution.RouteDescriptor{ + {RouteID: 100, Start: []byte(""), End: nil, GroupID: 1, State: distribution.RouteStateActive}, + }) + // v=2: k1 moved to g2 + applyComposed1Snapshot(t, e, 2, []distribution.RouteDescriptor{ + {RouteID: 101, Start: []byte(""), End: nil, GroupID: 2, State: distribution.RouteStateActive}, + }) + // This FSM serves g1 — the observed-version snapshot at v=1 + // agrees (the txn legitimately routed here at txn-begin time) + // but the current snapshot at v=2 says the key has moved off. + fsm := newComposed1FSM(t, e, 1) + + err := fsm.handleTxnRequest(context.Background(), commitTxnRequest(1, "k1"), 0) + require.ErrorIs(t, err, ErrComposed1Violation, + "the §4.4 current-version fence must reject the codex P1 trace: observed-version check passes (routes[1][k1]=g1) but the current snapshot at v=2 has moved k1 to g2") + require.Contains(t, err.Error(), "current-version", + "the wrapped diagnostic must identify the current-version fence as the rejecting check so M4 retry knows to re-route, not re-read the catalog") +} + +// TestVerifyComposed1_ObservedVersionZeroSkipsGate documents the +// legacy-default behaviour: a txn with ObservedRouteVersion == 0 +// (no M1 wiring) skips the gate entirely. Combined with M1's +// behaviour-neutral default (every existing caller leaves the +// field at zero), this is what keeps M3 from regressing the +// pre-feature posture for any caller that hasn't migrated yet. +func TestVerifyComposed1_ObservedVersionZeroSkipsGate(t *testing.T) { + t.Parallel() + + e := distribution.NewEngine() + applyComposed1Snapshot(t, e, 1, []distribution.RouteDescriptor{ + // Routes[1] says owner is g2 — if the gate ran, this would + // trip Composed1Violation for an FSM serving g1. But the + // gate short-circuits on ObservedRouteVersion==0. + {RouteID: 100, Start: []byte(""), End: nil, GroupID: 2, State: distribution.RouteStateActive}, + }) + fsm := newComposed1FSM(t, e, 1) + + err := fsm.handleTxnRequest(context.Background(), commitTxnRequest(0, "k"), 0) + // We expect either nil (gate skipped) or some non-Composed-1 + // error from the downstream phase handler (which we did not + // fully set up). What we MUST NOT see is Composed1Violation, + // because the gate must have short-circuited. + if err != nil { + require.False(t, errors.Is(err, ErrComposed1Violation), + "ObservedRouteVersion=0 (legacy caller) must skip the Composed-1 gate") + require.False(t, errors.Is(err, ErrComposed1VersionGCd), + "ObservedRouteVersion=0 must not surface ErrComposed1VersionGCd") + } +} + +// TestVerifyComposed1_NilRouteHistorySkipsGate documents the +// unwired-FSM default: a kvFSM constructed without WithRouteHistory +// has routes=nil and the gate short-circuits. Matches the +// pre-feature posture byte-for-byte for callers (test harnesses, +// the pre-M2 single-binary demo) that have not been updated. +func TestVerifyComposed1_NilRouteHistorySkipsGate(t *testing.T) { + t.Parallel() + + // No WithRouteHistory option — routes stays nil. + fsmIface := NewKvFSMWithHLC(store.NewMVCCStore(), NewHLC()) + fsm, ok := fsmIface.(*kvFSM) + require.True(t, ok) + require.Nil(t, fsm.routes) + + err := fsm.handleTxnRequest(context.Background(), commitTxnRequest(42, "k"), 0) + if err != nil { + require.False(t, errors.Is(err, ErrComposed1Violation), + "unwired FSM must not surface Composed1Violation") + require.False(t, errors.Is(err, ErrComposed1VersionGCd), + "unwired FSM must not surface ErrComposed1VersionGCd") + } +} diff --git a/kv/route_history.go b/kv/route_history.go index 7716e422..9a85e486 100644 --- a/kv/route_history.go +++ b/kv/route_history.go @@ -41,6 +41,14 @@ func (a *distributionEngineAdapter) SnapshotAt(v uint64) (RouteSnapshot, bool) { return distributionRouteSnapshot{snap: snap}, true } +func (a *distributionEngineAdapter) Current() (RouteSnapshot, bool) { + snap, ok := a.e.Current() + if !ok { + return nil, false + } + return distributionRouteSnapshot{snap: snap}, true +} + type distributionRouteSnapshot struct { snap distribution.RouteHistorySnapshot }