diff --git a/.github/workflows/go-test.yml b/.github/workflows/go-test.yml index a330325539..e4e6929d2b 100644 --- a/.github/workflows/go-test.yml +++ b/.github/workflows/go-test.yml @@ -15,6 +15,9 @@ concurrency: env: GO_VERSION: '1.25.6' GO_TEST_TIMEOUT: 30m + # state_db tests run in their own workflow (state_db-tests.yml); exclude + # that subtree everywhere in this workflow to avoid double-running them. + STATE_DB_PKG_PREFIX: github.com/sei-protocol/sei-chain/sei-db/state_db jobs: test: @@ -46,9 +49,10 @@ jobs: - name: Go test run: | - go test \ - -timeout=${{ env.GO_TEST_TIMEOUT }} \ - ./... + set -euo pipefail + PKGS=$(go list ./... | grep -v "^${STATE_DB_PKG_PREFIX}") + echo "$PKGS" | xargs go test \ + -timeout=${{ env.GO_TEST_TIMEOUT }} coverage: name: Coverage @@ -111,12 +115,15 @@ jobs: exit 0 fi - # Resolve changed directories to Go packages. + # Resolve changed directories to Go packages, then drop the + # state_db subtree (covered by state_db-tests.yml) so it neither + # gets tested nor measured for coverage here. DIRECT_PKGS=$(printf "%s\n" "$CHANGED_GO" \ | xargs -r -n1 dirname \ | sort -u \ | while read -r d; do go list "./$d" 2>/dev/null || true; done \ - | sort -u) + | sort -u \ + | grep -v "^${STATE_DB_PKG_PREFIX}" || true) if [[ -z "$DIRECT_PKGS" ]]; then echo "skip=true" >> "$GITHUB_OUTPUT" @@ -125,6 +132,7 @@ jobs: # Find one level of reverse dependencies (packages that import # the changed packages) so their tests exercise our changes too. + # Also strip state_db reverse deps; state_db-tests.yml owns them. ALL_IMPORTS=$(go list -f '{{.ImportPath}} {{join .Imports " "}} {{join .TestImports " "}}' ./...) REV_DEPS=$(awk ' @@ -135,7 +143,9 @@ jobs: if ($i in targets) { print pkg; break } } } - ' <(printf "%s\n" "$DIRECT_PKGS") <(printf "%s\n" "$ALL_IMPORTS") | sort -u) + ' <(printf "%s\n" "$DIRECT_PKGS") <(printf "%s\n" "$ALL_IMPORTS") \ + | sort -u \ + | grep -v "^${STATE_DB_PKG_PREFIX}" || true) # Merge direct + reverse-dep packages, deduplicate. TEST_PKGS=$(printf "%s\n%s\n" "$DIRECT_PKGS" "$REV_DEPS" \ @@ -165,12 +175,18 @@ jobs: - name: Go test with coverage (full path) if: github.event_name != 'merge_group' && github.event_name != 'pull_request' run: | + set -euo pipefail + PKG_LIST=$(go list ./... | grep -v "^${STATE_DB_PKG_PREFIX}") + PKGS=$(echo "$PKG_LIST" | tr '\n' ' ') + COVERPKG=$(echo "$PKG_LIST" | paste -sd, -) + # Pass $PKGS unquoted so go test gets one invocation with N args, + # not N invocations via xargs (which would each clobber coverage.out). go test \ -timeout=${{ env.GO_TEST_TIMEOUT }} \ -covermode=atomic \ -coverprofile=coverage.out \ - -coverpkg=./... \ - ./... + -coverpkg="$COVERPKG" \ + $PKGS - name: Upload coverage to Codecov if: github.event_name != 'merge_group' && (github.event_name != 'pull_request' || steps.cov-pkgs.outputs.skip != 'true') diff --git a/.github/workflows/state_db-tests.yml b/.github/workflows/state_db-tests.yml new file mode 100644 index 0000000000..649b31733a --- /dev/null +++ b/.github/workflows/state_db-tests.yml @@ -0,0 +1,194 @@ +name: DB +on: + workflow_call: + pull_request: + merge_group: + push: + branches: + - main + - release/** + +concurrency: + cancel-in-progress: true + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }} + +env: + GO_VERSION: '1.25.6' + GO_TEST_TIMEOUT: 30m + STATE_DB_PKG_PREFIX: github.com/sei-protocol/sei-chain/sei-db/state_db + +jobs: + test: + name: Race Detection + runs-on: uci-default + env: + GOFLAGS: -race -tags=ledger,test_ledger_mock + steps: + - uses: actions/checkout@v5 + with: + fetch-depth: 1 + + - uses: actions/setup-go@v6 + with: + go-version: ${{ env.GO_VERSION }} + cache: false + + - name: Login to Docker Hub + uses: docker/login-action@v3 + if: env.DOCKERHUB_USERNAME != '' + env: + DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Download modules + run: go mod download + + - name: Go test + run: | + go test \ + -timeout=${{ env.GO_TEST_TIMEOUT }} \ + ./sei-db/state_db/... + + coverage: + name: Coverage + runs-on: ${{ github.event_name == 'merge_group' && 'ubuntu-latest' || 'uci-default' }} + # Skip coverage report for merge groups, since the queue is about safety check. The merge to main will run coverage anyway. + # GitHub does not support setting "Required" CI workflows for merge queue separately from PRs. If we skip the job at top + # level we then have to work around result not being present. Hence, the repeated if statements per step. + env: + GOFLAGS: -tags=ledger,test_ledger_mock + steps: + - name: Check trigger + if: github.event_name == 'merge_group' + run: echo 'Coverage skipped in merge queue' + + - uses: actions/checkout@v5 + if: github.event_name != 'merge_group' + with: + # Depth 0 for PRs so merge-base diff against the base branch works. + fetch-depth: ${{ github.event_name == 'pull_request' && '0' || '1' }} + + - uses: actions/setup-go@v6 + if: github.event_name != 'merge_group' + with: + go-version: ${{ env.GO_VERSION }} + cache: false + + - name: Login to Docker Hub + uses: docker/login-action@v3 + if: github.event_name != 'merge_group' && env.DOCKERHUB_USERNAME != '' + env: + DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Download modules + if: github.event_name != 'merge_group' + run: go mod download + + - name: Determine coverage packages (PR) + id: cov-pkgs + if: github.event_name == 'pull_request' + run: | + set -euo pipefail + + BASE_SHA="${{ github.event.pull_request.base.sha }}" + HEAD_SHA="${{ github.event.pull_request.head.sha }}" + + # Use explicit SHAs — works regardless of checkout depth. + CHANGED=$(git diff --name-only "$BASE_SHA"..."$HEAD_SHA") + + # Keep only existing .go files (filters out deletions). + CHANGED_GO=$(printf "%s\n" "$CHANGED" \ + | grep -E '\.go$' \ + | while read -r f; do [ -f "$f" ] && echo "$f"; done \ + || true) + + if [[ -z "$CHANGED_GO" ]]; then + echo "skip=true" >> "$GITHUB_OUTPUT" + exit 0 + fi + + # Resolve changed directories to Go packages, then keep only + # the state_db subtree — this workflow exclusively owns those + # packages, and the main go-test.yml workflow owns the rest. + DIRECT_PKGS=$(printf "%s\n" "$CHANGED_GO" \ + | xargs -r -n1 dirname \ + | sort -u \ + | while read -r d; do go list "./$d" 2>/dev/null || true; done \ + | sort -u \ + | grep "^${STATE_DB_PKG_PREFIX}" || true) + + if [[ -z "$DIRECT_PKGS" ]]; then + echo "skip=true" >> "$GITHUB_OUTPUT" + exit 0 + fi + + # Find one level of reverse dependencies (packages that import + # the changed packages) so their tests exercise our changes too. + # Restrict reverse deps to the state_db subtree as well. + ALL_IMPORTS=$(go list -f '{{.ImportPath}} {{join .Imports " "}} {{join .TestImports " "}}' ./...) + + REV_DEPS=$(awk ' + NR == FNR { if (NF) targets[$1] = 1; next } + { + pkg = $1 + for (i = 2; i <= NF; i++) { + if ($i in targets) { print pkg; break } + } + } + ' <(printf "%s\n" "$DIRECT_PKGS") <(printf "%s\n" "$ALL_IMPORTS") \ + | sort -u \ + | grep "^${STATE_DB_PKG_PREFIX}" || true) + + # Merge direct + reverse-dep packages, deduplicate. + TEST_PKGS=$(printf "%s\n%s\n" "$DIRECT_PKGS" "$REV_DEPS" \ + | awk 'NF && !seen[$0]++ { printf "%s ", $0 }') + TEST_PKGS="${TEST_PKGS% }" + + # coverpkg stays as the direct changed packages — we want coverage + # measured on the code that actually changed, but tested via a wider + # set of packages (including reverse deps). + COV_PKGS=$(printf "%s\n" "$DIRECT_PKGS" \ + | awk 'NF { printf "%s%s", sep, $0; sep = "," }') + + echo "test_packages=$TEST_PKGS" >> "$GITHUB_OUTPUT" + echo "coverpkg=$COV_PKGS" >> "$GITHUB_OUTPUT" + echo "skip=false" >> "$GITHUB_OUTPUT" + + - name: Go test with coverage (PR fast path) + if: github.event_name == 'pull_request' && steps.cov-pkgs.outputs.skip != 'true' + run: | + go test \ + -timeout=${{ env.GO_TEST_TIMEOUT }} \ + -covermode=atomic \ + -coverprofile=coverage.out \ + -coverpkg=${{ steps.cov-pkgs.outputs.coverpkg }} \ + ${{ steps.cov-pkgs.outputs.test_packages }} + + - name: Go test with coverage (full path) + if: github.event_name != 'merge_group' && github.event_name != 'pull_request' + run: | + go test \ + -timeout=${{ env.GO_TEST_TIMEOUT }} \ + -covermode=atomic \ + -coverprofile=coverage.out \ + -coverpkg=./sei-db/state_db/... \ + ./sei-db/state_db/... + + - name: Upload coverage to Codecov + if: github.event_name != 'merge_group' && (github.event_name != 'pull_request' || steps.cov-pkgs.outputs.skip != 'true') + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + # Hard-fail on main (full path); soft on PRs (partial data). + fail_ci_if_error: ${{ github.event_name != 'pull_request' }} + disable_search: 'true' + # PR fast-path coverage is intentionally partial. + # Upload under a separate flag to avoid apples-to-oranges project comparisons. + name: ${{ github.event_name == 'pull_request' && 'sei-db-state-db-pr-coverage' || 'sei-db-state-db-coverage' }} + files: ./coverage.out + flags: ${{ github.event_name == 'pull_request' && 'sei-db-state-db-pr' || 'sei-db-state-db' }} diff --git a/sei-db/state_db/sc/composite/fuzz_crud_test.go b/sei-db/state_db/sc/composite/fuzz_crud_test.go new file mode 100644 index 0000000000..8755fd004b --- /dev/null +++ b/sei-db/state_db/sc/composite/fuzz_crud_test.go @@ -0,0 +1,53 @@ +package composite + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/common/testutil" +) + +// TestCompositeFuzzAllModes drives a randomized CRUD workload against +// CompositeCommitStore for every config.WriteMode, verifying: +// +// - per-block oracle equivalence on Get / Has (and Iterator / GetProof +// on capability-supporting stores); +// - cs.Commit returns the expected monotonic version, and +// cs.LastCommitInfo / cs.GetLatestVersion agree; +// - end-of-test oracle equivalence across every live key; +// - end-of-test deep inspection of the nested memiavl + flatkv backends +// (every key is placed on the backend the mode dictates, no phantom +// rows on either side, migration metadata in the expected state). +// +// Each mode runs in its own t.Run sub-test so a failure surfaces with the +// mode name attached. The same seed is reused across modes; print the +// seed at the top of the parent test for reproducibility. +func TestCompositeFuzzAllModes(t *testing.T) { + const blocks = 100 + + for _, profile := range allModeProfiles() { + t.Run(profile.name, func(t *testing.T) { + rng := testutil.NewTestRandom() + + cs := newCompositeForMode(t, t.Context(), t.TempDir(), profile) + defer func() { _ = cs.Close() }() + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + + simulateBlocksOnComposite(t, cs, oracle, keysInUse, profile, rng, + defaultWorkloadOpts(blocks)) + + require.Equal(t, int64(blocks), cs.Version(), + "%s: cs.Version must equal blocks after the workload", profile.name) + latest, err := cs.GetLatestVersion() + require.NoError(t, err, "%s: GetLatestVersion", profile.name) + require.Equal(t, int64(blocks), latest, + "%s: GetLatestVersion must equal cs.Version", profile.name) + + verifyReadsEqual(t, cs, oracle) + deepInspectPlacement(t, cs, oracle, profile) + }) + } +} diff --git a/sei-db/state_db/sc/composite/fuzz_edge_cases_test.go b/sei-db/state_db/sc/composite/fuzz_edge_cases_test.go new file mode 100644 index 0000000000..ae6f50502e --- /dev/null +++ b/sei-db/state_db/sc/composite/fuzz_edge_cases_test.go @@ -0,0 +1,441 @@ +package composite + +import ( + "context" + "sort" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/common/testutil" + "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/proto" +) + +// TestCompositeFuzzEdgeCases pins down a handful of adversarial +// change-set shapes that the random workloads in +// TestCompositeFuzzCRUDAllModes / TestCompositeFuzzStateSync* / +// TestCompositeFuzzRollback exercise too rarely to give them a chance +// to fail individually. Each sub-test is targeted at a single shape but +// runs across every mode where the shape is meaningful, so the same +// invariants (deep-inspection placement, oracle equivalence, version +// monotonicity, LastCommitInfo agreement) are checked everywhere. +// +// Scope: +// +// - EmptyChangeSets: ApplyChangeSets(nil) + Commit must advance the +// version, leave user-visible data unchanged, and (for active +// migration modes) still let the migration manager progress its +// internal iterator. Run on all 8 modes. +// - SingleKeyBlocks: blocks containing exactly one KVPair, rotated +// across initialStores so every store gets exercised at this volume. +// Catches any code path that hard-codes a "multiple stores per +// batch" assumption. Run on all 8 modes. +// - AllDeleteBlocks: a single block deletes every key the oracle +// knows about (after a small prime). For backendFlatKV / dual-write +// EVM stores this also exercises flatkv's IsDelete pruning: the +// end-of-test physical-row count must be zero in those DBs. +// Run on all 8 modes. +// - MigrationCompletesOnBlock1: opens each active-migration mode +// against an empty (or empty-data) prior-state on-disk layout, +// applies one empty block, and asserts the migration completes +// in that first commit. Catches any code path that assumes +// migration takes at least N>1 blocks to drain. +func TestCompositeFuzzEdgeCases(t *testing.T) { + t.Run("EmptyChangeSets", testEdgeCaseEmptyChangeSets) + t.Run("SingleKeyBlocks", testEdgeCaseSingleKeyBlocks) + t.Run("AllDeleteBlocks", testEdgeCaseAllDeleteBlocks) + t.Run("MigrationCompletesOnBlock1", testEdgeCaseMigrationCompletesOnBlock1) +} + +// testEdgeCaseEmptyChangeSets primes each mode with a small random +// workload and then commits N consecutive empty blocks. Versions must +// keep advancing; oracle reads through cs.Get must keep matching; the +// per-block LastCommitInfo must still expose the right version. For +// active-migration modes the migration manager is still in the apply +// path and is free to write its own metadata / migrated batches even +// on a user-empty block — those writes are not visible through the +// oracle and the invariants above accommodate them. +func testEdgeCaseEmptyChangeSets(t *testing.T) { + const ( + primeBlocks = 5 + emptyBlocks = 6 + ) + + for _, profile := range allModeProfiles() { + t.Run(profile.name, func(t *testing.T) { + rng := testutil.NewTestRandom() + dir := t.TempDir() + + cs := newCompositeForMode(t, t.Context(), dir, profile) + defer func() { _ = cs.Close() }() + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + + simulateBlocksOnComposite(t, cs, oracle, keysInUse, profile, rng, + defaultWorkloadOpts(primeBlocks)) + + versionBeforeEmpty := cs.Version() + require.Equal(t, int64(primeBlocks), versionBeforeEmpty) + + for i := 0; i < emptyBlocks; i++ { + expectedVersion := versionBeforeEmpty + int64(i+1) + applyBlockOpsTo(t, cs, blockOps{}, expectedVersion) + verifyReadsEqual(t, cs, oracle) + } + + require.Equal(t, versionBeforeEmpty+int64(emptyBlocks), cs.Version(), + "%s: empty blocks must advance the version", profile.name) + + deepInspectPlacement(t, cs, oracle, profile) + }) + } +} + +// testEdgeCaseSingleKeyBlocks commits N blocks, each containing exactly +// one KVPair, rotated across profile.initialStores. After every block +// the new key must read back through cs.Get / cs.Has. After all blocks +// deepInspectPlacement asserts every per-key placement is correct (no +// "first key in a block" or "single-store-per-block" code path leaked +// keys to the wrong backend). +// +// For active-migration modes the workload is too small to keep +// migration in-flight beyond a single block at the default rate, which +// is exactly the point: the migration manager must accept tiny user +// inputs and still produce a correct steady state. +func testEdgeCaseSingleKeyBlocks(t *testing.T) { + const blocks = 32 + + for _, profile := range allModeProfiles() { + t.Run(profile.name, func(t *testing.T) { + rng := testutil.NewTestRandom() + dir := t.TempDir() + + cs := newCompositeForMode(t, t.Context(), dir, profile) + defer func() { _ = cs.Close() }() + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + + storeRotation := append([]string(nil), profile.initialStores...) + sort.Strings(storeRotation) + + for i := 0; i < blocks; i++ { + store := storeRotation[i%len(storeRotation)] + var pair *proto.KVPair + if store == keys.EVMStoreKey { + pair = randomEVMKVPair(rng) + } else { + pair = randomKVPair(rng) + } + ops := blockOps{ + changesets: []*proto.NamedChangeSet{ + {Name: store, Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{pair}}}, + }, + addedKeys: []keyPair{{store: store, key: string(pair.Key)}}, + } + expectedVersion := int64(i + 1) + applyBlockOpsTo(t, cs, ops, expectedVersion) + oracle.Apply(ops.changesets) + for _, kp := range ops.addedKeys { + keysInUse.Add(kp) + } + + gotVal, gotOK, err := cs.Get(store, pair.Key) + require.NoError(t, err, + "%s block %d: cs.Get store=%q key=%x", profile.name, expectedVersion, store, pair.Key) + require.True(t, gotOK, + "%s block %d: cs.Get must find the just-written key store=%q key=%x", + profile.name, expectedVersion, store, pair.Key) + require.Equal(t, pair.Value, gotVal, + "%s block %d: cs.Get value mismatch on store=%q key=%x", + profile.name, expectedVersion, store, pair.Key) + hasOK, err := cs.Has(store, pair.Key) + require.NoError(t, err, + "%s block %d: cs.Has store=%q key=%x", profile.name, expectedVersion, store, pair.Key) + require.True(t, hasOK, + "%s block %d: cs.Has must agree with cs.Get on the just-written key", profile.name, expectedVersion) + } + + deepInspectPlacement(t, cs, oracle, profile) + }) + } +} + +// testEdgeCaseAllDeleteBlocks primes each mode with a few blocks of +// mixed-store data, then commits a single block whose changesets are +// nothing but deletes — one per key in the oracle. Post-block: +// +// - every oracle key must read as not-present through cs.Get / cs.Has; +// - the oracle is then cleared and the deep-inspection physical-count +// invariants must hold against an empty oracle (i.e. the +// memiavl/flatkv physical row counts are zero modulo any migration +// metadata flatkv still holds). +// +// After that, a small random workload is appended to confirm cs still +// accepts writes / reads correctly post-mass-delete. +func testEdgeCaseAllDeleteBlocks(t *testing.T) { + const ( + primeBlocks = 5 + afterBlocks = 3 + ) + + for _, profile := range allModeProfiles() { + t.Run(profile.name, func(t *testing.T) { + rng := testutil.NewTestRandom() + dir := t.TempDir() + + cs := newCompositeForMode(t, t.Context(), dir, profile) + defer func() { _ = cs.Close() }() + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + + simulateBlocksOnComposite(t, cs, oracle, keysInUse, profile, rng, + defaultWorkloadOpts(primeBlocks)) + + deleteOps := buildAllDeleteOps(oracle) + deleteBlock := cs.Version() + 1 + applyBlockOpsTo(t, cs, deleteOps, deleteBlock) + oracle.Apply(deleteOps.changesets) + for _, kp := range deleteOps.removedKeys { + keysInUse.Remove(kp) + } + + for storeName, storeMap := range oracle.stores { + require.Empty(t, storeMap, + "%s: oracle must be empty after mass-delete (store=%q has %d keys remaining)", + profile.name, storeName, len(storeMap)) + } + require.Equal(t, 0, keysInUse.Len(), + "%s: liveKeySet must be empty after mass-delete", profile.name) + + for _, kp := range deleteOps.removedKeys { + gotVal, gotOK, err := cs.Get(kp.store, []byte(kp.key)) + require.NoError(t, err, + "%s: cs.Get after mass-delete store=%q key=%x", profile.name, kp.store, []byte(kp.key)) + require.False(t, gotOK, + "%s: cs.Get must report not-present after mass-delete store=%q key=%x", + profile.name, kp.store, []byte(kp.key)) + require.Nil(t, gotVal, + "%s: cs.Get value must be nil after mass-delete store=%q key=%x", + profile.name, kp.store, []byte(kp.key)) + hasOK, err := cs.Has(kp.store, []byte(kp.key)) + require.NoError(t, err, + "%s: cs.Has after mass-delete store=%q key=%x", profile.name, kp.store, []byte(kp.key)) + require.False(t, hasOK, + "%s: cs.Has must report not-present after mass-delete store=%q key=%x", + profile.name, kp.store, []byte(kp.key)) + } + + afterOpts := defaultWorkloadOpts(afterBlocks) + afterOpts.startingBlock = int(cs.Version()) + 1 + simulateBlocksOnComposite(t, cs, oracle, keysInUse, profile, rng, afterOpts) + + deepInspectPlacement(t, cs, oracle, profile) + }) + } +} + +// testEdgeCaseMigrationCompletesOnBlock1 sets up each active-migration +// mode so that, when its migration manager first runs, its source tree +// in memiavl is empty. Under that condition the manager's iterator +// returns boundary=Complete on the very first NextBatch call, the +// composite's first post-open commit writes MigrationVersionKey, and +// the mode behaves like its post-completion equivalent from block 2 +// onward. +// +// The trick is that the composite's ApplyChangeSets short-circuits on +// an empty changeset (the router is never called), so completion only +// fires when a user write reaches the migration manager. Each prior +// mode is primed by a single write to that prior mode's +// migration-owned store, which: +// +// - is sufficient to drive the prior manager's iterator exactly once +// (it sees empty source -> boundary=Complete in that block); +// - lands the prior's write directly in flatkv via the +// "boundary is Complete, forward user writes to new DB" path; and +// - leaves memiavl empty for every store, so the next mode's source +// is empty when its migration runs. +// +// Catches any code path that silently assumes migration takes at least +// N>1 blocks to drain (off-by-one between the iterator's "Complete" +// signal and the version-key write, "first block is special" branches +// that only fire when there's data to move, etc.). +func testEdgeCaseMigrationCompletesOnBlock1(t *testing.T) { + const afterBlocks = 5 + + for _, profile := range activeMigrationProfiles() { + t.Run(profile.name, func(t *testing.T) { + rng := testutil.NewTestRandom() + dir := t.TempDir() + ctx := t.Context() + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + + primeThroughPriorActiveModesEmpty(t, ctx, dir, profile.writeMode, + oracle, keysInUse, rng) + + cs := newCompositeForMode(t, ctx, dir, profile) + defer func() { _ = cs.Close() }() + + versionBefore := cs.Version() + + triggerStore := migrationOwnedStoreFor(profile.writeMode) + triggerPair := freshPairFor(triggerStore, rng) + triggerOps := blockOps{ + changesets: []*proto.NamedChangeSet{ + {Name: triggerStore, Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{triggerPair}}}, + }, + addedKeys: []keyPair{{store: triggerStore, key: string(triggerPair.Key)}}, + } + expectedTriggerVersion := versionBefore + 1 + applyBlockOpsTo(t, cs, triggerOps, expectedTriggerVersion) + oracle.Apply(triggerOps.changesets) + for _, kp := range triggerOps.addedKeys { + keysInUse.Add(kp) + } + + require.True(t, migrationCompleteFor(cs, profile.writeMode), + "%s: migration must complete on the first block after open when memiavl source is empty", + profile.name) + require.False(t, migrationBoundaryPresent(cs), + "%s: MigrationBoundaryKey must be absent after immediate completion", + profile.name) + + afterOpts := defaultWorkloadOpts(afterBlocks) + afterOpts.startingBlock = int(cs.Version()) + 1 + simulateBlocksOnComposite(t, cs, oracle, keysInUse, profile, rng, afterOpts) + + deepInspectPlacement(t, cs, oracle, profile) + }) + } +} + +// buildAllDeleteOps constructs a blockOps whose changesets delete every +// key currently in the oracle. The returned ops can be applied with +// applyBlockOpsTo and the caller is responsible for folding +// removedKeys back into its liveKeySet. Iteration over per-store maps +// and per-key strings is performed in sorted order so the produced +// changeset is byte-stable for a fixed oracle. +func buildAllDeleteOps(oracle *oracleStore) blockOps { + storeNames := make([]string, 0, len(oracle.stores)) + for name, storeMap := range oracle.stores { + if len(storeMap) == 0 { + continue + } + storeNames = append(storeNames, name) + } + sort.Strings(storeNames) + + var ops blockOps + for _, name := range storeNames { + storeMap := oracle.stores[name] + ks := make([]string, 0, len(storeMap)) + for k := range storeMap { + ks = append(ks, k) + } + sort.Strings(ks) + pairs := make([]*proto.KVPair, 0, len(ks)) + for _, k := range ks { + pairs = append(pairs, &proto.KVPair{Key: []byte(k), Delete: true}) + ops.removedKeys = append(ops.removedKeys, keyPair{store: name, key: k}) + } + ops.changesets = append(ops.changesets, &proto.NamedChangeSet{ + Name: name, + Changeset: proto.ChangeSet{Pairs: pairs}, + }) + } + return ops +} + +// primeThroughPriorActiveModesEmpty walks dir through every prior +// active-migration mode of target, in order. For each prior mode it +// opens the mode and applies exactly one block that writes a single +// KVPair to the mode's migration-owned store. That: +// +// - drives the prior manager's iterator exactly once. Because no +// data was ever written to the prior's source tree, NextBatch +// returns boundary=Complete on this first call and the same +// commit writes the prior's MigrationVersionKey; +// - the user's trigger write lands directly in flatkv via the +// manager's "boundary is Complete, forward to new DB" path, so +// memiavl stays empty for every store and the next prior mode +// sees the same empty-source condition. +// +// Each trigger write is mirrored into oracle / keysInUse so the +// caller's end-of-test deepInspectPlacement covers the priming data +// too. Does nothing for MigrateEVM (priorActiveModes is empty). +func primeThroughPriorActiveModesEmpty( + t *testing.T, + ctx context.Context, + dir string, + target config.WriteMode, + oracle *oracleStore, + keysInUse *liveKeySet, + rng *testutil.TestRandom, +) { + t.Helper() + for _, priorName := range priorActiveModes(target) { + priorProfile := lookupProfile(priorName) + cs := newCompositeForMode(t, ctx, dir, priorProfile) + + triggerStore := migrationOwnedStoreFor(priorProfile.writeMode) + triggerPair := freshPairFor(triggerStore, rng) + ops := blockOps{ + changesets: []*proto.NamedChangeSet{ + {Name: triggerStore, Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{triggerPair}}}, + }, + addedKeys: []keyPair{{store: triggerStore, key: string(triggerPair.Key)}}, + } + expectedVersion := cs.Version() + 1 + applyBlockOpsTo(t, cs, ops, expectedVersion) + oracle.Apply(ops.changesets) + for _, kp := range ops.addedKeys { + keysInUse.Add(kp) + } + + require.True(t, migrationCompleteFor(cs, priorProfile.writeMode), + "prime: prior mode %q migration must complete on block 1 with empty memiavl source", + priorName) + + require.NoError(t, cs.Close(), "prime: Close for prior mode %q", priorName) + } +} + +// migrationOwnedStoreFor returns a store name whose route in the given +// active-migration mode flows through the migration manager. A single +// non-empty write to this store drives the manager's iterator exactly +// once, which — combined with an empty source tree — is the minimal +// way to drive immediate migration completion through the composite +// layer (composite.ApplyChangeSets short-circuits empty changesets, +// and non-owned-store writes route via the passthrough router and +// never reach the manager). +func migrationOwnedStoreFor(mode config.WriteMode) string { + switch mode { + case config.MigrateEVM: + return keys.EVMStoreKey + case config.MigrateAllButBank: + // Any store that's neither bank nor EVM works; staking is a + // stable, always-present member of MemIAVLStoreKeys. + return keys.StakingStoreKey + case config.MigrateBank: + return keys.BankStoreKey + default: + panic("migrationOwnedStoreFor: not an active-migration mode") + } +} + +// freshPairFor returns a single random KVPair shaped for store: EVM +// keys use the structured-EVM generator (so nonce / codehash / code / +// storage shapes are all reachable), every other store uses the plain +// 8-byte / 8-byte generator. +func freshPairFor(store string, rng *testutil.TestRandom) *proto.KVPair { + if store == keys.EVMStoreKey { + return randomEVMKVPair(rng) + } + return randomKVPair(rng) +} diff --git a/sei-db/state_db/sc/composite/fuzz_modes_test.go b/sei-db/state_db/sc/composite/fuzz_modes_test.go new file mode 100644 index 0000000000..0e39e288db --- /dev/null +++ b/sei-db/state_db/sc/composite/fuzz_modes_test.go @@ -0,0 +1,356 @@ +package composite + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/config" +) + +// backendID identifies which backend a store's keys live on after a given +// mode reaches its steady state. Used by the end-of-test deep inspection. +type backendID int + +const ( + backendMemiavl backendID = iota + backendFlatKV + // backendDualWriteEVM marks the keys.EVMStoreKey under TestOnlyDualWrite: + // keys must be present in BOTH memiavl AND flatkv. Other dual-write + // stores stay on memiavl only and use backendMemiavl. + backendDualWriteEVM +) + +// modeProfile captures everything the fuzz suite needs to drive and verify +// a single config.WriteMode end-to-end. The registry returned by +// allModeProfiles is the single source of truth for per-mode behavior; +// individual tests must not switch on the write mode directly. +type modeProfile struct { + // name is the human-readable mode name used in t.Run sub-tests. + name string + + // writeMode is the WriteMode under test. + writeMode config.WriteMode + + // keysToMigratePerBlock seeds StateCommitConfig.KeysToMigratePerBlock. + // Required to be > 0 by config.Validate; non-migrating modes still + // supply a positive value to keep the config valid even though the + // migration manager is not in the data path. + keysToMigratePerBlock int + + // initialStores is passed to cs.Initialize. The fuzz workload only + // touches stores in this list when generating new keys. + initialStores []string + + // iterableStores is the set of stores whose data is reachable via a + // route that supports Iterator() in this mode. The workload restricts + // per-block Iterator calls to this set; deep inspection ignores it. + iterableStores map[string]bool + + // proofSupportingStores is the set of stores whose data is reachable + // via a route that supports GetProof() in this mode. Always a subset + // of iterableStores because only memiavl supplies ICS23 proofs and + // every memiavl-routed store also supports iteration. + proofSupportingStores map[string]bool + + // hasMemiavl reports whether cs.memIAVL is non-nil in this mode. + hasMemiavl bool + + // hasFlatKV reports whether cs.flatKV is non-nil in this mode. + hasFlatKV bool + + // isActiveMigration reports whether the migration manager is in the + // data path for this mode (MigrateEVM / MigrateAllButBank / + // MigrateBank). The CRUD and reopen suites must drive enough blocks + // for active-migration modes to complete migration before the + // end-of-test deep inspection runs. + isActiveMigration bool + + // finalPlacement maps each module name to the backend its keys live + // on after this mode reaches its post-migration steady state. The + // deep inspector consults this map directly. For modes where + // hasMemiavl is false, every entry must be backendFlatKV; for modes + // where hasFlatKV is false, every entry must be backendMemiavl; + // for TestOnlyDualWrite, keys.EVMStoreKey is backendDualWriteEVM. + finalPlacement map[string]backendID +} + +// allModeProfiles returns the per-mode profile table consumed by the +// table-driven fuzz tests. Coverage: all 8 values of config.WriteMode. +// +// Initial stores are always keys.MemIAVLStoreKeys: cs.Initialize is a no-op +// when memiavl is nil (FlatKVOnly) and accepts the canonical set in every +// other mode. Migration metadata is mounted on flatkv where applicable, not +// on memiavl, so we do not include migration.MigrationStore here. +func allModeProfiles() []modeProfile { + allMem := makeBackendMap(keys.MemIAVLStoreKeys, backendMemiavl) + allFlat := makeBackendMap(keys.MemIAVLStoreKeys, backendFlatKV) + evmFlatRestMem := makeBackendMap(keys.MemIAVLStoreKeys, backendMemiavl) + evmFlatRestMem[keys.EVMStoreKey] = backendFlatKV + bankMemRestFlat := makeBackendMap(keys.MemIAVLStoreKeys, backendFlatKV) + bankMemRestFlat[keys.BankStoreKey] = backendMemiavl + dualWrite := makeBackendMap(keys.MemIAVLStoreKeys, backendMemiavl) + dualWrite[keys.EVMStoreKey] = backendDualWriteEVM + + allKeysSet := stringSliceToSet(keys.MemIAVLStoreKeys) + allButEVM := stringSliceToSet(keys.MemIAVLStoreKeys) + delete(allButEVM, keys.EVMStoreKey) + onlyBank := map[string]bool{keys.BankStoreKey: true} + empty := map[string]bool{} + + return []modeProfile{ + { + name: "MemiavlOnly", + writeMode: config.MemiavlOnly, + keysToMigratePerBlock: 1024, + initialStores: keys.MemIAVLStoreKeys, + iterableStores: allKeysSet, + proofSupportingStores: allKeysSet, + hasMemiavl: true, + hasFlatKV: false, + isActiveMigration: false, + finalPlacement: allMem, + }, + { + name: "MigrateEVM", + writeMode: config.MigrateEVM, + keysToMigratePerBlock: 1024, + initialStores: keys.MemIAVLStoreKeys, + // During the in-flight migration the EVM route is owned by + // the migration manager and does not expose Iterator; the + // suite drives enough blocks to complete migration and only + // then runs deep inspection. The per-block iterator sample + // stays on non-EVM modules throughout, where it is always + // safe. + iterableStores: allButEVM, + proofSupportingStores: allButEVM, + hasMemiavl: true, + hasFlatKV: true, + isActiveMigration: true, + finalPlacement: evmFlatRestMem, + }, + { + name: "EVMMigrated", + writeMode: config.EVMMigrated, + keysToMigratePerBlock: 1024, + initialStores: keys.MemIAVLStoreKeys, + iterableStores: allButEVM, + proofSupportingStores: allButEVM, + hasMemiavl: true, + hasFlatKV: true, + isActiveMigration: false, + finalPlacement: evmFlatRestMem, + }, + { + name: "MigrateAllButBank", + writeMode: config.MigrateAllButBank, + keysToMigratePerBlock: 1024, + initialStores: keys.MemIAVLStoreKeys, + // In MigrateAllButBank the migration manager owns every + // non-bank-non-evm module, EVM is on flatkv, and only bank + // stays on memiavl. Restrict per-block Iterator sampling to + // bank. + iterableStores: onlyBank, + proofSupportingStores: onlyBank, + hasMemiavl: true, + hasFlatKV: true, + isActiveMigration: true, + finalPlacement: bankMemRestFlat, + }, + { + name: "AllMigratedButBank", + writeMode: config.AllMigratedButBank, + keysToMigratePerBlock: 1024, + initialStores: keys.MemIAVLStoreKeys, + iterableStores: onlyBank, + proofSupportingStores: onlyBank, + hasMemiavl: true, + hasFlatKV: true, + isActiveMigration: false, + finalPlacement: bankMemRestFlat, + }, + { + name: "MigrateBank", + writeMode: config.MigrateBank, + keysToMigratePerBlock: 1024, + initialStores: keys.MemIAVLStoreKeys, + // During the bank migration the bank route is owned by the + // migration manager and every other module routes to flatkv. + // Nothing is safely iterable. + iterableStores: empty, + proofSupportingStores: empty, + hasMemiavl: true, + hasFlatKV: true, + isActiveMigration: true, + finalPlacement: allFlat, + }, + { + name: "FlatKVOnly", + writeMode: config.FlatKVOnly, + keysToMigratePerBlock: 1024, + initialStores: keys.MemIAVLStoreKeys, + iterableStores: empty, + proofSupportingStores: empty, + hasMemiavl: false, + hasFlatKV: true, + isActiveMigration: false, + finalPlacement: allFlat, + }, + { + name: "TestOnlyDualWrite", + writeMode: config.TestOnlyDualWrite, + keysToMigratePerBlock: 1024, + initialStores: keys.MemIAVLStoreKeys, + // Dual-write keeps every key in memiavl (the primary) and + // additionally mirrors evm keys to flatkv. Reads route + // through memiavl, so iteration is safe for every store. + iterableStores: allKeysSet, + proofSupportingStores: allKeysSet, + hasMemiavl: true, + hasFlatKV: true, + isActiveMigration: false, + finalPlacement: dualWrite, + }, + } +} + +// activeMigrationProfiles returns the subset of profiles whose router +// performs background data migration. Used by the parallel-replica +// state-sync test, which only applies to active-migration modes. +func activeMigrationProfiles() []modeProfile { + all := allModeProfiles() + out := make([]modeProfile, 0, 3) + for _, p := range all { + if p.isActiveMigration { + out = append(out, p) + } + } + return out +} + +// compositeOption mutates the StateCommitConfig that newCompositeForMode +// builds for the test's CompositeCommitStore. Tests that need behavior +// outside the defaults (e.g. dense snapshots for the exporter) pass +// option funcs to newCompositeForMode. +type compositeOption func(*config.StateCommitConfig) + +// withFlatKVSnapshotPerBlock forces flatkv to take a pebble checkpoint +// snapshot on every committed version. Required by tests that open the +// flatkv Exporter at a version below which the WAL is non-contiguous — +// most importantly TestCompositeFuzzStateSyncDuringMigration, where +// flatkv is seeded mid-test by a MemiavlOnly → MigrateEVM transition +// and its WAL therefore has no entries below the seed version. The +// readonly catchup loads committedVersion from the most recent +// snapshot's metadata DB, so a snapshot at or below the export version +// is required for the catchup to start at the seeded version rather +// than at 0. +// +// This option is expensive: every commit produces a new on-disk +// checkpoint (≈200 ms each in CI), so it is opt-in and only applied +// where the test design needs it. +func withFlatKVSnapshotPerBlock() compositeOption { + return func(cfg *config.StateCommitConfig) { + cfg.FlatKVConfig.SnapshotInterval = 1 + } +} + +// withSnapshotKeepRecent overrides SnapshotKeepRecent on both backends. +// The default (2) is too low for tests that need to roll back across +// many blocks: both memiavl.Rollback and flatkv.Rollback locate the +// snapshot at-or-below the rollback target, and a too-aggressively +// pruned snapshot makes the operation fail with "no snapshot found for +// target version N". +// +// Memiavl-only modes still set keep-recent on flatkv even though flatkv +// is not allocated, because the option is mode-agnostic; the unused +// config field is a no-op. +func withSnapshotKeepRecent(keep uint32) compositeOption { + return func(cfg *config.StateCommitConfig) { + cfg.FlatKVConfig.SnapshotKeepRecent = keep + cfg.MemIAVLConfig.SnapshotKeepRecent = keep + } +} + +// newCompositeForMode opens a CompositeCommitStore at dir for the given +// profile, with deterministic settings so reopen and exporter behavior is +// reproducible. Returns the loaded composite and registers no cleanup — +// callers manage Close themselves so the helper plays nicely with tests +// that reopen the same dir multiple times. +// +// Default tuning: +// +// - memiavl AsyncCommitBuffer=0 / SnapshotInterval=1: GetLatestVersion +// and the memiavl exporter observe on-disk state, which races with +// the async commit buffer; a per-block snapshot also makes +// historical Exporter calls trivial. The cost is negligible +// compared to flatkv snapshots. +// - flatkv SnapshotInterval is left at its default (10000 = no +// snapshots in any reasonably-sized test run). Tests whose WAL is +// contiguous from v=1 do not need flatkv snapshots and avoid the +// ≈200 ms-per-commit checkpoint cost by relying on WAL replay +// during readonly catchup. +// +// Per-test overrides come in as variadic compositeOption funcs. +func newCompositeForMode( + t *testing.T, + ctx context.Context, + dir string, + profile modeProfile, + opts ...compositeOption, +) *CompositeCommitStore { + t.Helper() + + cfg := config.DefaultStateCommitConfig() + cfg.WriteMode = profile.writeMode + cfg.KeysToMigratePerBlock = profile.keysToMigratePerBlock + + cfg.MemIAVLConfig.AsyncCommitBuffer = 0 + cfg.MemIAVLConfig.SnapshotInterval = 1 + cfg.MemIAVLConfig.SnapshotMinTimeInterval = 0 + + for _, opt := range opts { + opt(&cfg) + } + + cs, err := NewCompositeCommitStore(ctx, dir, cfg) + require.NoError(t, err, "NewCompositeCommitStore for %s", profile.name) + require.NoError(t, cs.Initialize(profile.initialStores), "Initialize for %s", profile.name) + + _, err = cs.LoadVersion(0, false) + require.NoError(t, err, "LoadVersion for %s", profile.name) + + // Mode-shape invariants. Cheap to check and surface mis-configured + // modes loudly at construction time. + if profile.hasMemiavl { + require.NotNil(t, cs.memIAVL, "%s must allocate memiavl", profile.name) + } else { + require.Nil(t, cs.memIAVL, "%s must not allocate memiavl", profile.name) + } + if profile.hasFlatKV { + require.NotNil(t, cs.flatKV, "%s must allocate flatkv", profile.name) + } else { + require.Nil(t, cs.flatKV, "%s must not allocate flatkv", profile.name) + } + + return cs +} + +// makeBackendMap returns a map from every module name in stores to b. +func makeBackendMap(stores []string, b backendID) map[string]backendID { + out := make(map[string]backendID, len(stores)) + for _, s := range stores { + out[s] = b + } + return out +} + +// stringSliceToSet returns ss as a set. +func stringSliceToSet(ss []string) map[string]bool { + out := make(map[string]bool, len(ss)) + for _, s := range ss { + out[s] = true + } + return out +} diff --git a/sei-db/state_db/sc/composite/fuzz_oracle_test.go b/sei-db/state_db/sc/composite/fuzz_oracle_test.go new file mode 100644 index 0000000000..64320dbdc2 --- /dev/null +++ b/sei-db/state_db/sc/composite/fuzz_oracle_test.go @@ -0,0 +1,327 @@ +package composite + +import ( + "encoding/binary" + + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/common/testutil" + "github.com/sei-protocol/sei-chain/sei-db/proto" +) + +// oracleStore is the in-memory source of truth that the fuzz suite compares +// every read-path result against. Outer map keys are module/store names, +// inner map keys are stringified store keys. +// +// Live in the composite package's test binary; intentionally distinct from +// the package-internal TestInMemoryRouter that lives under migration/. +type oracleStore struct { + stores map[string]map[string][]byte +} + +// newOracleStore returns an empty oracle. +func newOracleStore() *oracleStore { + return &oracleStore{stores: make(map[string]map[string][]byte)} +} + +// Apply replays a slice of NamedChangeSets against the oracle in the same +// order they would be applied by the composite store. +func (o *oracleStore) Apply(changesets []*proto.NamedChangeSet) { + for _, ncs := range changesets { + if ncs == nil { + continue + } + storeMap, ok := o.stores[ncs.Name] + if !ok { + storeMap = make(map[string][]byte) + o.stores[ncs.Name] = storeMap + } + for _, pair := range ncs.Changeset.Pairs { + if pair == nil { + continue + } + if pair.Delete { + delete(storeMap, string(pair.Key)) + continue + } + // Copy the value: the oracle outlives the changeset slice + // and must not alias caller-owned memory. + value := make([]byte, len(pair.Value)) + copy(value, pair.Value) + storeMap[string(pair.Key)] = value + } + } +} + +// Get returns the value the oracle holds for (store, key) and whether the +// key is present. +func (o *oracleStore) Get(store string, key []byte) ([]byte, bool) { + storeMap, ok := o.stores[store] + if !ok { + return nil, false + } + v, ok := storeMap[string(key)] + if !ok { + return nil, false + } + return v, true +} + +// Has is a thin wrapper around Get for symmetry with the composite API. +func (o *oracleStore) Has(store string, key []byte) bool { + _, ok := o.Get(store, key) + return ok +} + +// Snapshot returns a deep copy of the oracle. Used by the parallel-replica +// state-sync test to freeze the source-of-truth at the moment of export, +// then continue mutating each replica's oracle independently before +// reconciling. +func (o *oracleStore) Snapshot() *oracleStore { + out := newOracleStore() + for storeName, storeMap := range o.stores { + copyMap := make(map[string][]byte, len(storeMap)) + for k, v := range storeMap { + cv := make([]byte, len(v)) + copy(cv, v) + copyMap[k] = cv + } + out.stores[storeName] = copyMap + } + return out +} + +// keyPair identifies a single live key in the oracle by (store, key). +type keyPair struct { + store string + key string +} + +// liveKeySetFromOracle rebuilds a liveKeySet whose membership matches the +// oracle's current key map. Used by tests that snapshot an oracle, mutate +// it forward, then restore — they only need to clone the oracle (oracle +// already has Snapshot) and can recompute the liveKeySet from the clone. +// Iteration order is non-deterministic, but rng-driven sampling is the +// only consumer and it does not assume insertion order. +func liveKeySetFromOracle(o *oracleStore) *liveKeySet { + s := newLiveKeySet() + for storeName, storeMap := range o.stores { + for k := range storeMap { + s.Add(keyPair{store: storeName, key: k}) + } + } + return s +} + +// liveKeySet supports O(1) Add, O(1) Remove, and O(n) deterministic random +// sampling without relying on map iteration order. Mirrors the analogous +// helper used by the migration-package fuzz tests. Sampling is reproducible +// from the rng seed. +type liveKeySet struct { + keys []keyPair + idx map[keyPair]int +} + +func newLiveKeySet() *liveKeySet { + return &liveKeySet{idx: make(map[keyPair]int)} +} + +func (s *liveKeySet) Len() int { return len(s.keys) } + +func (s *liveKeySet) Add(kp keyPair) { + if _, ok := s.idx[kp]; ok { + return + } + s.idx[kp] = len(s.keys) + s.keys = append(s.keys, kp) +} + +func (s *liveKeySet) Remove(kp keyPair) { + i, ok := s.idx[kp] + if !ok { + return + } + last := len(s.keys) - 1 + if i != last { + s.keys[i] = s.keys[last] + s.idx[s.keys[i]] = i + } + s.keys = s.keys[:last] + delete(s.idx, kp) +} + +// Sample returns up to n distinct keyPairs uniformly at random using Floyd's +// algorithm. Output is reproducible from the rng's seed. +func (s *liveKeySet) Sample(r *testutil.TestRandom, n int) []keyPair { + population := len(s.keys) + if n > population { + n = population + } + if n == 0 { + return nil + } + chosen := make(map[int]struct{}, n) + out := make([]keyPair, 0, n) + for i := population - n; i < population; i++ { + j := r.Intn(i + 1) + if _, exists := chosen[j]; exists { + chosen[i] = struct{}{} + out = append(out, s.keys[i]) + } else { + chosen[j] = struct{}{} + out = append(out, s.keys[j]) + } + } + return out +} + +// randomKVPair returns a non-EVM 8-byte key/value pair. Used for every +// store except keys.EVMStoreKey. +func randomKVPair(rng *testutil.TestRandom) *proto.KVPair { + return &proto.KVPair{ + Key: rng.Bytes(8), + Value: rng.Bytes(8), + } +} + +// evmAddrPoolSize is the size of the deterministic EVM address pool the +// data generators draw from. Picked deliberately small (relative to the +// per-test write volume) so that random nonce / codeHash / code / storage +// draws land on the SAME address often enough that flatkv's accountDB +// merge path (nonce + codeHash combined into a single physical row) is +// stochastically exercised every run. With ~100 EVM writes per block and +// 32 addresses, ~3 nonce writes and ~3 codeHash writes share an address +// per block on average. +const evmAddrPoolSize = 32 + +// evmSlotPoolSize is the size of the storage-slot pool. Smaller than +// evmAddrPoolSize so that repeat writes to the same (addr, slot) tuple +// happen frequently — this exercises pendingWrites coalescing inside +// flatkv's storageDB path. +const evmSlotPoolSize = 8 + +// evmAddressFromPool returns a 20-byte EVM address drawn uniformly from a +// finite pool. The pool index lives in the high 4 bytes; the rest are +// zero. Deterministic for a given pool index so that nonces, code hashes +// and codes generated for the same index land on the same accountDB row. +func evmAddressFromPool(rng *testutil.TestRandom) []byte { + idx := uint32(rng.Intn(evmAddrPoolSize)) + addr := make([]byte, keys.AddressLen) + binary.BigEndian.PutUint32(addr[:4], idx) + return addr +} + +// evmSlotFromPool returns a 32-byte storage slot drawn uniformly from a +// finite pool. The pool index lives in the high 4 bytes; the rest are +// zero. Deterministic so that repeat draws hit the same flatkv storage +// row. +func evmSlotFromPool(rng *testutil.TestRandom) []byte { + idx := uint32(rng.Intn(evmSlotPoolSize)) + slot := make([]byte, 32) + binary.BigEndian.PutUint32(slot[:4], idx) + return slot +} + +// randomEVMKVPair returns a random but structurally valid EVM key-value +// pair for use with the keys.EVMStoreKey store. The four EVM key kinds +// (nonce, code hash, code, storage) are selected uniformly. Addresses +// and storage slots come from a small deterministic pool so that the +// suite frequently writes more than one field to the same accountDB row +// (exercising the nonce + codeHash merge path) and the same storage row +// (exercising slot-level coalescing). +func randomEVMKVPair(rng *testutil.TestRandom) *proto.KVPair { + addr := evmAddressFromPool(rng) + switch rng.Intn(4) { + case 0: + return &proto.KVPair{Key: keys.BuildEVMKey(keys.EVMKeyNonce, addr), Value: rng.Bytes(8)} + case 1: + return &proto.KVPair{Key: keys.BuildEVMKey(keys.EVMKeyCodeHash, addr), Value: rng.Bytes(32)} + case 2: + return &proto.KVPair{Key: keys.BuildEVMKey(keys.EVMKeyCode, addr), Value: rng.Bytes(32)} + default: + slot := evmSlotFromPool(rng) + stripped := append(addr, slot...) + return &proto.KVPair{Key: keys.BuildEVMKey(keys.EVMKeyStorage, stripped), Value: rng.Bytes(32)} + } +} + +// randomEVMValue returns a fresh random value of the correct length for the +// given EVM key. Used when updating an existing EVM key so the new value +// matches the on-disk length constraint for that key kind. +func randomEVMValue(rng *testutil.TestRandom, key []byte) []byte { + kind, _ := keys.ParseEVMKey(key) + switch kind { + case keys.EVMKeyNonce: + return rng.Bytes(8) + case keys.EVMKeyCodeHash, keys.EVMKeyCode, keys.EVMKeyStorage: + return rng.Bytes(32) + default: + return rng.Bytes(8) + } +} + +// oracleFlatkvShape converts the oracle map into the per-DB shape that +// flatkv would store the same data in: one accountDB row per address +// that has either a nonce OR a codeHash assignment, one codeDB row per +// address with a code assignment, one storageDB row per (addr, slot), +// and one legacyDB row per remaining (module, key). The returned struct +// is exactly what an end-of-test physical-count probe should expect to +// see on disk in flatkv (plus any migration-metadata rows, which the +// caller layers on separately). +// +// Important: the helper does NOT model flatkv's IsDelete pruning. The +// oracle only ever stores live keys, so this is correct as long as the +// composite test never artificially leaves a "tombstone-with-no-fields" +// row on disk — flatkv's own batch path deletes such rows on Commit, so +// every live oracle entry maps to at least the accountDB / codeDB / +// storageDB / legacyDB row computed here. +type oracleFlatkvShape struct { + accountRows int64 // unique addresses with a live nonce OR codeHash + codeRows int64 // unique addresses with a live code + storageRows int64 // unique (addr, slot) tuples with live storage + legacyRows int64 // remaining non-EVM keys + any legacy-shaped EVM data +} + +// total returns the sum of every per-DB row count. +func (s oracleFlatkvShape) total() int64 { + return s.accountRows + s.codeRows + s.storageRows + s.legacyRows +} + +// oracleFlatkvShapeFor partitions the oracle keys that belong to a +// flatkv-backed store (per profile.finalPlacement) into the per-DB shape +// flatkv would physically store them in. +// +// The set of "flatkv-backed" stores is determined by the caller passing +// the set of placements to count. For modes with a single backendID per +// store, that is just the store's placement; for TestOnlyDualWrite the +// EVM store is also counted because the dual-write router mirrors EVM +// data into flatkv. +func oracleFlatkvShapeFor(oracle *oracleStore, includeStore func(store string) bool) oracleFlatkvShape { + var shape oracleFlatkvShape + for storeName, storeMap := range oracle.stores { + if !includeStore(storeName) { + continue + } + if storeName == keys.EVMStoreKey { + addAccount := make(map[string]struct{}) + addCode := make(map[string]struct{}) + for k := range storeMap { + kind, stripped := keys.ParseEVMKey([]byte(k)) + switch kind { + case keys.EVMKeyNonce, keys.EVMKeyCodeHash: + addAccount[string(stripped)] = struct{}{} + case keys.EVMKeyCode: + addCode[string(stripped)] = struct{}{} + case keys.EVMKeyStorage: + shape.storageRows++ + case keys.EVMKeyLegacy: + shape.legacyRows++ + } + } + shape.accountRows += int64(len(addAccount)) + shape.codeRows += int64(len(addCode)) + continue + } + shape.legacyRows += int64(len(storeMap)) + } + return shape +} diff --git a/sei-db/state_db/sc/composite/fuzz_reopen_test.go b/sei-db/state_db/sc/composite/fuzz_reopen_test.go new file mode 100644 index 0000000000..78ae1a62fb --- /dev/null +++ b/sei-db/state_db/sc/composite/fuzz_reopen_test.go @@ -0,0 +1,94 @@ +package composite + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/common/testutil" +) + +// TestCompositeFuzzReopenAllModes exercises the close + reopen path of +// the CompositeCommitStore across every config.WriteMode. Each mode runs +// a randomized workload spanning multiple close/reopen cycles against the +// same on-disk directory, verifying: +// +// - per-block oracle equivalence is maintained across reopen cycles; +// - cs.GetLatestVersion immediately after each reopen agrees with the +// last pre-close cs.Version; +// - state surviving the reopen matches the oracle on Get / Has; +// - end-of-test deep inspection of the nested backends after the final +// reopen (no phantom rows on either side, every key on the backend +// the mode dictates, migration metadata in the expected state). +// +// Active-migration modes have enough total workload across the cycles to +// drive migration to completion before the final deep inspection. +func TestCompositeFuzzReopenAllModes(t *testing.T) { + const ( + blocksPerCycle = 30 + reopenCycles = 3 + ) + + for _, profile := range allModeProfiles() { + t.Run(profile.name, func(t *testing.T) { + rng := testutil.NewTestRandom() + dir := t.TempDir() + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + + var cs *CompositeCommitStore + for cycle := 0; cycle < reopenCycles; cycle++ { + cs = newCompositeForMode(t, t.Context(), dir, profile) + + // After the first cycle the just-loaded store must + // already be at the last pre-close committed version. + expectedStart := int64(cycle * blocksPerCycle) + require.Equal(t, expectedStart, cs.Version(), + "%s cycle=%d: cs.Version on reopen must equal pre-close version", + profile.name, cycle) + latest, err := cs.GetLatestVersion() + require.NoError(t, err) + require.Equal(t, expectedStart, latest, + "%s cycle=%d: GetLatestVersion on reopen must equal cs.Version", + profile.name, cycle) + + // Drive blocksPerCycle blocks of workload, numbering + // them so simulateBlocksOnComposite asserts on + // monotonically-increasing versions across all cycles. + opts := defaultWorkloadOpts(blocksPerCycle) + opts.startingBlock = int(expectedStart) + 1 + simulateBlocksOnComposite(t, cs, oracle, keysInUse, profile, rng, opts) + + // Sanity: post-cycle version is what we expect. + expectedEnd := int64((cycle + 1) * blocksPerCycle) + require.Equal(t, expectedEnd, cs.Version(), + "%s cycle=%d: cs.Version after cycle must equal expected end", + profile.name, cycle) + + // Oracle equivalence survives a Close-Reopen if we + // run the check before close. We do; deep inspection + // is reserved for after the final reopen. + verifyReadsEqual(t, cs, oracle) + + require.NoError(t, cs.Close(), + "%s cycle=%d: Close must not error", profile.name, cycle) + } + + // Final reopen for end-of-test inspection. This is the + // "post-restart steady state" the deep inspection cares + // about: every backend is opened from disk, no in-memory + // state survives. + cs = newCompositeForMode(t, t.Context(), dir, profile) + defer func() { _ = cs.Close() }() + + finalVersion := int64(reopenCycles * blocksPerCycle) + require.Equal(t, finalVersion, cs.Version(), + "%s: cs.Version after final reopen must equal cumulative workload size", + profile.name) + + verifyReadsEqual(t, cs, oracle) + deepInspectPlacement(t, cs, oracle, profile) + }) + } +} diff --git a/sei-db/state_db/sc/composite/fuzz_rollback_test.go b/sei-db/state_db/sc/composite/fuzz_rollback_test.go new file mode 100644 index 0000000000..c629736a56 --- /dev/null +++ b/sei-db/state_db/sc/composite/fuzz_rollback_test.go @@ -0,0 +1,346 @@ +package composite + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/common/testutil" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/migration" +) + +// TestCompositeFuzzRollback exercises cs.Rollback for every WriteMode in +// steady state (i.e. the rollback target and the post-rollback continuation +// are both inside the same WriteMode and, for active-migration modes, both +// past migration completion). +// +// Per mode the test: +// +// 1. Drives a random workload forward to version M1. For active-migration +// modes the default KeysToMigratePerBlock=1024 completes migration in +// a handful of blocks, so M1=30 is comfortably post-migration. The +// oracle at M1 is snapshotted so we can compare reads after the +// rollback. +// 2. Continues the workload forward to version M2. +// 3. Calls cs.Rollback(M1). +// 4. Asserts cs.Version() == M1, LastCommitInfo.Version == M1, and that +// cs.Get / cs.Has agree with the M1-oracle for every live key. For +// active-migration modes the migration metadata at M1 (version key +// present, boundary key absent) must also match. +// 5. Continues a fresh workload forward to version M3 against the M1 +// oracle clone. End-of-test deepInspectPlacement guarantees no +// phantom rows survived the rollback and no oracle key was lost. +// +// Scope: rollback never crosses a WriteMode change — the entire test +// runs in one mode against one CompositeCommitStore — and for the active +// modes the rollback target is post-completion, so the boundary-key / +// in-flight-iterator state is not in play here. The mid-migration +// rollback case is the responsibility of +// TestCompositeFuzzRollbackDuringMigration below. +func TestCompositeFuzzRollback(t *testing.T) { + const ( + preRollbackBlocks = 30 // forward to M1 + postRollbackBlocks = 30 // forward to M2, then roll back to M1 + postContinueBlocks = 30 // M1 -> M3 after rollback + writesAfterRollback = 10 // smaller per-block volume in the + readsAfterRollback = 10 // continuation; the strong invariants + newKeysAfterRollback = 10 // are at M1 and the deep inspection. + deletesAfterRollback = 2 + ) + + for _, profile := range allModeProfiles() { + t.Run(profile.name, func(t *testing.T) { + rng := testutil.NewTestRandom() + dir := t.TempDir() + + cs := newCompositeForMode(t, t.Context(), dir, profile) + defer func() { _ = cs.Close() }() + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + + // Phase 1: forward to M1. + simulateBlocksOnComposite(t, cs, oracle, keysInUse, profile, rng, + defaultWorkloadOpts(preRollbackBlocks)) + rollbackVersion := cs.Version() + require.Equal(t, int64(preRollbackBlocks), rollbackVersion, + "%s: pre-rollback version", profile.name) + + // Capture the state we expect cs to be in after the rollback. + oracleAtRollback := oracle.Snapshot() + + // Phase 2: forward to M2 (these blocks are discarded by + // the rollback). + opts := defaultWorkloadOpts(postRollbackBlocks) + opts.startingBlock = int(cs.Version()) + 1 + simulateBlocksOnComposite(t, cs, oracle, keysInUse, profile, rng, opts) + require.Equal(t, int64(preRollbackBlocks+postRollbackBlocks), cs.Version(), + "%s: pre-rollback version after discarded blocks", profile.name) + + // Phase 3: rollback. + require.NoError(t, cs.Rollback(rollbackVersion), + "%s: cs.Rollback(%d)", profile.name, rollbackVersion) + require.Equal(t, rollbackVersion, cs.Version(), + "%s: cs.Version after Rollback", profile.name) + lci := cs.LastCommitInfo() + require.NotNil(t, lci, "%s: LastCommitInfo after Rollback", profile.name) + require.Equal(t, rollbackVersion, lci.Version, + "%s: LastCommitInfo.Version after Rollback", profile.name) + + // Phase 4: verify reads against the M1 oracle. + verifyReadsEqual(t, cs, oracleAtRollback) + + // For active-migration modes M1 is past completion, so the + // version key must still be on disk and the boundary key + // must still be absent (rollback to a post-completion + // version must not re-introduce the in-flight boundary). + if profile.isActiveMigration { + require.True(t, migrationVersionKeyPresent(cs), + "%s: MigrationVersionKey must remain present after rolling back to post-completion version", + profile.name) + require.False(t, migrationBoundaryPresent(cs), + "%s: MigrationBoundaryKey must remain absent after rolling back to post-completion version", + profile.name) + } + + // Phase 5: continue forward with the M1 oracle. Use the + // derived liveKeySet (membership only — rng-driven + // sampling does not assume insertion order). + postOracle := oracleAtRollback.Snapshot() + postKeys := liveKeySetFromOracle(postOracle) + postOpts := defaultWorkloadOpts(postContinueBlocks) + postOpts.startingBlock = int(cs.Version()) + 1 + postOpts.updatesPerBlock = writesAfterRollback + postOpts.readsPerBlock = readsAfterRollback + postOpts.newKeysPerBlock = newKeysAfterRollback + postOpts.deletesPerBlock = deletesAfterRollback + simulateBlocksOnComposite(t, cs, postOracle, postKeys, profile, rng, postOpts) + + deepInspectPlacement(t, cs, postOracle, profile) + }) + } +} + +// TestCompositeFuzzRollbackDuringMigration exercises rollback across the +// migration-completion boundary inside a single active-migration mode. +// The migration manager owns persistent metadata (MigrationVersionKey, +// MigrationBoundaryKey) on flatkv, so rolling back from "past completion" +// to "mid-migration" must: +// +// - restore the on-disk boundary key to whatever it was at the +// mid-migration target version; +// - remove the on-disk version key (it must not survive a rollback to +// before completion); +// - bring memiavl + flatkv data back to the exact state they were in at +// the mid-migration version, so that resumed migration converges +// idempotently. +// +// Scope: rollback never crosses a WriteMode change. The cs is kept in +// the target active-migration mode for the whole test; only the position +// of the migration boundary changes. +// +// The 3 active-migration modes (MigrateEVM, MigrateAllButBank, MigrateBank) +// each get their own sub-test, with priming through their prior +// active-migration phases via the same ladder +// TestCompositeFuzzStateSyncDuringMigration uses. +// +// --------------------------------------------------------------------- +// +// BLOCKED: this test reproduces a production-code defect in +// composite.CompositeCommitStore.Rollback and is therefore skipped at +// the top until the defect is fixed. See the in-test t.Skipf below for +// the failing assertions and the inferred root cause; nothing else +// about the test setup needs to change once the manager's in-memory +// boundary/iterator are re-initialized on rollback. +func TestCompositeFuzzRollbackDuringMigration(t *testing.T) { + t.Skipf("blocked on composite.CompositeCommitStore.Rollback not re-initializing " + + "MigrationManager in-memory state (boundary, iterator). " + + "After cs.Rollback drops memiavl+flatkv back across the migration " + + "completion point, the manager's in-memory boundary stays at " + + "MigrationBoundaryComplete from the discard-phase commits. Every " + + "subsequent cs.Get for a now-unmigrated key routes through " + + "boundary.IsMigrated -> newDBReader (flatkv), where the key no " + + "longer exists, and returns (nil, false). Every subsequent " + + "cs.ApplyChangeSets takes the boundary.Equals(MigrationBoundaryComplete) " + + "early-return and writes straight to flatkv, silently skipping " + + "the migration that the on-disk boundary key says is still in " + + "flight. Reproducer / failing assertion: this exact test, with " + + "the t.Skipf removed; verifyReadsEqual fails immediately after " + + "the Rollback call (line 269) for every active-migration mode.") + const ( + primingBlocksMemiavlOnly = 15 + priorMigrationMaxBlocks = 40 + // Blocks of slow-rate target-mode workload before the + // rollback target is captured. Need at least 1 so that the + // boundary key is on disk; 2 makes the in-flight position + // less degenerate. + preRollbackBlocks = 3 + // Blocks driven past the rollback target before issuing the + // rollback. Sized to comfortably exceed the slow-rate drain + // time so migration completes in this window for all three + // modes. + discardedBlocks = 80 + // Final convergence cap. The post-rollback drain has the + // same per-block dynamics as the original migration, so this + // just needs to exceed (memiavl-key-count / slow-rate). + postRollbackMaxBlocks = 200 + ) + + // snapshotKeepRecent must comfortably exceed the longest rollback + // distance the test performs. With primingBlocksMemiavlOnly + + // 2*priorMigrationMaxBlocks (worst case, MigrateBank target) + + // preRollbackBlocks before the rollback target, plus discardedBlocks + // + postRollbackMaxBlocks after it, 256 leaves ample room. + const snapshotKeepRecent = 256 + openOpts := []compositeOption{ + withFlatKVSnapshotPerBlock(), + withSnapshotKeepRecent(snapshotKeepRecent), + } + + for _, profile := range activeMigrationProfiles() { + t.Run(profile.name, func(t *testing.T) { + rng := testutil.NewTestRandom() + dir := t.TempDir() + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + + // ---- Phase A: prime MemiavlOnly ---- + // + // openOpts is applied to every reopen below: + // + // * withFlatKVSnapshotPerBlock: once flatkv is + // initialized (Phase B onward), its Rollback path + // closes the DBs and replays the WAL from the + // most recent snapshot to the rollback target. + // The MemiavlOnly → MigrateEVM transition seeds + // flatkv at memiavl.Version+1, leaving the flatkv + // WAL with no entries below that seed version; + // without a snapshot at the seeded version, the + // rollback catchup starts at v=0 and rejects the + // seeded-WAL prefix as a "hole". + // * withSnapshotKeepRecent: the rollback target's + // snapshot must still be present on disk when + // Rollback is called, after ~discardedBlocks of + // subsequent commits would otherwise have pruned + // it under the default keep-recent=2. + cs := newCompositeForMode(t, t.Context(), dir, lookupProfile("MemiavlOnly"), + openOpts...) + simulateBlocksOnComposite(t, cs, oracle, keysInUse, + lookupProfile("MemiavlOnly"), rng, + defaultWorkloadOpts(primingBlocksMemiavlOnly)) + require.NoError(t, cs.Close()) + + // ---- Phase B: complete every prior active migration ---- + for _, prior := range priorActiveModes(profile.writeMode) { + priorProfile := lookupProfile(prior) + cs := newCompositeForMode(t, t.Context(), dir, priorProfile, + openOpts...) + + opts := defaultWorkloadOpts(priorMigrationMaxBlocks) + opts.startingBlock = int(cs.Version()) + 1 + simulateBlocksOnComposite(t, cs, oracle, keysInUse, priorProfile, rng, opts) + require.True(t, migrationCompleteFor(cs, priorProfile.writeMode), + "%s: prior phase %s did not complete in %d blocks", + profile.name, prior, priorMigrationMaxBlocks) + + require.NoError(t, cs.Close()) + } + + // ---- Phase C: open target mode at slow rate, drive a + // few blocks so the boundary is mid-flight ---- + slowTarget := profile + slowTarget.keysToMigratePerBlock = slowKeysPerBlockFor(profile.writeMode) + cs = newCompositeForMode(t, t.Context(), dir, slowTarget, openOpts...) + defer func() { _ = cs.Close() }() + + opts := defaultWorkloadOpts(preRollbackBlocks) + opts.startingBlock = int(cs.Version()) + 1 + simulateBlocksOnComposite(t, cs, oracle, keysInUse, slowTarget, rng, opts) + + require.False(t, migrationCompleteFor(cs, profile.writeMode), + "%s: migration unexpectedly already complete after %d slow-rate blocks", + profile.name, preRollbackBlocks) + require.True(t, migrationBoundaryPresent(cs), + "%s: MigrationBoundaryKey must be present mid-migration", profile.name) + + rollbackVersion := cs.Version() + oracleAtRollback := oracle.Snapshot() + + // ---- Phase D: drive past completion ---- + opts = defaultWorkloadOpts(discardedBlocks) + opts.startingBlock = int(cs.Version()) + 1 + simulateBlocksOnComposite(t, cs, oracle, keysInUse, slowTarget, rng, opts) + require.True(t, migrationCompleteFor(cs, profile.writeMode), + "%s: migration must complete within %d discarded blocks (slow rate %d/block)", + profile.name, discardedBlocks, slowTarget.keysToMigratePerBlock) + + // ---- Phase E: rollback to mid-migration ---- + require.NoError(t, cs.Rollback(rollbackVersion), + "%s: cs.Rollback(%d) across migration completion", profile.name, rollbackVersion) + require.Equal(t, rollbackVersion, cs.Version(), + "%s: cs.Version after rollback across completion", profile.name) + require.True(t, migrationBoundaryPresent(cs), + "%s: MigrationBoundaryKey must be restored after rolling back into mid-migration", + profile.name) + // Use migrationCompleteFor (value-aware) rather than a + // plain "version key present" check: for the + // non-first active migrations (MigrateAllButBank / + // MigrateBank) the version key already holds the + // prior migration's target value at the rollback + // target, so "absent" is not the correct mid-flight + // invariant. The correct invariant for every active + // mode is "the version key does not equal this + // migration's target", i.e. !migrationCompleteFor. + require.False(t, migrationCompleteFor(cs, profile.writeMode), + "%s: migration must not appear complete after rolling back to before its completion (version key still equals target %d)", + profile.name, targetMigrationVersion(profile.writeMode)) + + verifyReadsEqual(t, cs, oracleAtRollback) + + // ---- Phase F: resume migration; assert it completes + // idempotently and ends in the same placement as before + // the rollback ---- + postOracle := oracleAtRollback.Snapshot() + postKeys := liveKeySetFromOracle(postOracle) + postOpts := defaultWorkloadOpts(1) + postOpts.readsPerBlock = 10 + postOpts.iteratorReadsPerBlock = 0 + postOpts.proofReadsPerBlock = 0 + postOpts.newKeysPerBlock = 10 + postOpts.updatesPerBlock = 10 + postOpts.deletesPerBlock = 2 + + var blocksDriven int + for b := 0; b < postRollbackMaxBlocks; b++ { + blocksDriven = b + 1 + blockOpts := postOpts + blockOpts.startingBlock = int(cs.Version()) + 1 + simulateBlocksOnComposite(t, cs, postOracle, postKeys, slowTarget, rng, blockOpts) + if migrationCompleteFor(cs, profile.writeMode) { + break + } + } + require.Less(t, blocksDriven, postRollbackMaxBlocks, + "%s: post-rollback migration failed to converge within %d blocks", + profile.name, postRollbackMaxBlocks) + require.True(t, migrationCompleteFor(cs, profile.writeMode), + "%s: post-rollback migration must complete", profile.name) + + deepInspectPlacement(t, cs, postOracle, profile) + }) + } +} + +// migrationVersionKeyPresent reports whether MigrationVersionKey is +// currently persisted on cs.flatKV. Distinct from migrationCompleteFor: +// this helper only asks "is a version key present", regardless of which +// migration mode value it holds. Used by the rollback tests which only +// need a yes/no answer at the metadata layer. +func migrationVersionKeyPresent(cs *CompositeCommitStore) bool { + if cs.flatKV == nil { + return false + } + _, ok := cs.flatKV.Get(migration.MigrationStore, []byte(migration.MigrationVersionKey)) + return ok +} diff --git a/sei-db/state_db/sc/composite/fuzz_state_sync_during_migration_test.go b/sei-db/state_db/sc/composite/fuzz_state_sync_during_migration_test.go new file mode 100644 index 0000000000..8465043fda --- /dev/null +++ b/sei-db/state_db/sc/composite/fuzz_state_sync_during_migration_test.go @@ -0,0 +1,364 @@ +package composite + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/common/testutil" + "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/migration" +) + +// TestCompositeFuzzStateSyncDuringMigration exercises the "snapshot taken +// mid-migration, restored into a second instance, both finish migrating +// from the same point" contract for every active-migration mode +// (MigrateEVM, MigrateAllButBank, MigrateBank). +// +// Per mode, the test: +// +// 1. Primes a source directory through every prior migration phase so it +// ends up in the disk layout expected by the target mode (e.g. for +// MigrateBank: MemiavlOnly → MigrateEVM (complete) → MigrateAllButBank +// (complete)). The oracle is populated throughout so verification at +// the end covers data written in every phase. +// +// 2. Opens the source in the target mode with a deliberately small +// KeysToMigratePerBlock so the migration spans many blocks. Drives a +// handful of blocks to be sure the boundary is mid-migration when the +// snapshot is taken. +// +// 3. Takes an Exporter snapshot, drains it. +// +// 4. Opens a fresh destination directory in the target mode, replays the +// snapshot through its Importer, and reloads the destination at the +// source version. +// +// 5. Drives an identical workload — generated once from a shared rng — on +// both src and dst block by block. After every block: +// +// - both stores must agree on cs.Version() and cs.GetLatestVersion(); +// - both stores must agree with the oracle on Get / Has. +// +// Continues until both stores' migrations complete (MigrationVersionKey +// present in flatkv on both). A safety cap guards against infinite +// loops if migration cannot converge. +// +// 6. End-of-test deep inspection on both src and dst. +// +// This is the strongest invariant the suite asserts about state sync: +// equality of the post-import migration behavior with the pre-export +// behavior, viewed through the migration boundary. +func TestCompositeFuzzStateSyncDuringMigration(t *testing.T) { + const ( + // Priming controls (per phase). + primingBlocksMemiavlOnly = 15 + // Prior-migration completion budget. With KeysToMigratePerBlock + // at the default (1024) every prior migration drains in well + // under this many blocks even after the heaviest priming. + priorMigrationMaxBlocks = 40 + // Blocks driven on src in the target mode before snapshotting. + // We want migration to be in-flight (boundary present), not yet + // complete (version key absent). + preSnapshotBlocks = 2 + // Safety cap for the parallel phase. The expected number of + // blocks is bounded by (uncompleted-key-count / slow-rate) + // plus the time needed for the migration manager to outpace + // new "ahead-of-boundary" writes that show up each block; 200 + // is generous given the priming sizes and per-mode slow rates. + parallelMaxBlocks = 200 + ) + + for _, profile := range activeMigrationProfiles() { + t.Run(profile.name, func(t *testing.T) { + sharedRng := testutil.NewTestRandom() + srcDir := t.TempDir() + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + + // ---- Phase A: prime memiavl in MemiavlOnly mode so that + // when we transition to the target migration mode, + // memiavl has data to migrate and flatkv has not yet + // been written to. flatkv's WAL therefore starts at the + // version of the first MigrateEVM (or first prior phase) + // commit, and the catchup performed by the read-only + // exporter opens from a snapshot rather than from + // version 1, so the "no WAL entries below the seed + // version" condition does not surface as a WAL hole. + // + // withFlatKVSnapshotPerBlock is applied to every reopen + // in this test: as soon as flatkv exists (Phase B and + // later), the readonly Exporter catchup must be able to + // load a snapshot whose metadata DB carries the seeded + // version, otherwise catchup starts at v=0 and rejects + // the seeded-but-no-prefix WAL. + cs := newCompositeForMode(t, t.Context(), srcDir, lookupProfile("MemiavlOnly"), + withFlatKVSnapshotPerBlock()) + simulateBlocksOnComposite(t, cs, oracle, keysInUse, + lookupProfile("MemiavlOnly"), sharedRng, + defaultWorkloadOpts(primingBlocksMemiavlOnly)) + require.NoError(t, cs.Close()) + + // ---- Phase B: complete every prior migration mode in order ---- + for _, prior := range priorActiveModes(profile.writeMode) { + priorProfile := lookupProfile(prior) + cs := newCompositeForMode(t, t.Context(), srcDir, priorProfile, + withFlatKVSnapshotPerBlock()) + + opts := defaultWorkloadOpts(priorMigrationMaxBlocks) + opts.startingBlock = int(cs.Version()) + 1 + simulateBlocksOnComposite(t, cs, oracle, keysInUse, priorProfile, sharedRng, opts) + + require.True(t, migrationCompleteFor(cs, priorProfile.writeMode), + "%s: prior phase %s did not complete its migration in %d blocks", + profile.name, prior, priorMigrationMaxBlocks) + + require.NoError(t, cs.Close()) + } + + // ---- Phase C: open target mode at a slow migration rate; + // drive a few blocks to be sure migration is in-flight ---- + slowTarget := profile + slowTarget.keysToMigratePerBlock = slowKeysPerBlockFor(profile.writeMode) + + src := newCompositeForMode(t, t.Context(), srcDir, slowTarget, + withFlatKVSnapshotPerBlock()) + opts := defaultWorkloadOpts(preSnapshotBlocks) + opts.startingBlock = int(src.Version()) + 1 + simulateBlocksOnComposite(t, src, oracle, keysInUse, slowTarget, sharedRng, opts) + + require.False(t, migrationCompleteFor(src, profile.writeMode), + "%s: migration unexpectedly already complete after %d slow-rate blocks", + profile.name, preSnapshotBlocks) + require.True(t, migrationBoundaryPresent(src), + "%s: MigrationBoundaryKey must be present mid-migration", profile.name) + + snapshotVersion := src.Version() + + // ---- Phase D: snapshot src, import into dst ---- + exporter, err := src.Exporter(snapshotVersion) + require.NoError(t, err, "%s: src.Exporter", profile.name) + items := fuzzDrainExporter(t, exporter) + require.NoError(t, exporter.Close()) + + // Close src then reopen so that any Exporter-internal state + // is released before we start the parallel write phase. + require.NoError(t, src.Close()) + src = newCompositeForMode(t, t.Context(), srcDir, slowTarget, + withFlatKVSnapshotPerBlock()) + require.Equal(t, snapshotVersion, src.Version(), + "%s: src.Version after reopen must equal snapshot version", profile.name) + defer func() { _ = src.Close() }() + + dstDir := t.TempDir() + dst := newCompositeForMode(t, t.Context(), dstDir, slowTarget, + withFlatKVSnapshotPerBlock()) + require.NoError(t, dst.Close(), "%s: dst pre-import Close", profile.name) + + importer, err := dst.Importer(snapshotVersion) + require.NoError(t, err, "%s: dst.Importer", profile.name) + fuzzReplayImport(t, importer, items) + require.NoError(t, importer.Close()) + + _, err = dst.LoadVersion(snapshotVersion, false) + require.NoError(t, err, "%s: dst.LoadVersion at imported version", profile.name) + defer func() { _ = dst.Close() }() + + require.Equal(t, snapshotVersion, dst.Version(), + "%s: dst.Version after import must equal snapshot version", profile.name) + verifyReadsEqual(t, dst, oracle) + + require.False(t, migrationCompleteFor(dst, profile.writeMode), + "%s: dst unexpectedly past migration completion immediately after import", + profile.name) + require.True(t, migrationBoundaryPresent(dst), + "%s: dst MigrationBoundaryKey must be present after importing a mid-migration snapshot", + profile.name) + + // ---- Phase E: drive identical workload on both until both + // complete migration ---- + parallelOpts := defaultWorkloadOpts(1) + // Keep the per-block read sample lightweight; deep checks + // happen at the end. + parallelOpts.readsPerBlock = 10 + parallelOpts.iteratorReadsPerBlock = 0 + parallelOpts.proofReadsPerBlock = 0 + + var blocksDriven int + for b := 0; b < parallelMaxBlocks; b++ { + blocksDriven = b + 1 + ops := generateBlockOps(slowTarget, keysInUse, sharedRng, parallelOpts) + expectedVersion := snapshotVersion + int64(blocksDriven) + + applyBlockOpsTo(t, src, ops, expectedVersion) + applyBlockOpsTo(t, dst, ops, expectedVersion) + + oracle.Apply(ops.changesets) + for _, kp := range ops.addedKeys { + keysInUse.Add(kp) + } + for _, kp := range ops.removedKeys { + keysInUse.Remove(kp) + } + + require.Equal(t, src.Version(), dst.Version(), + "%s parallel-block=%d: src and dst versions diverged", + profile.name, blocksDriven) + + // Per-block sampled read parity against the oracle. + for _, kp := range keysInUse.Sample(sharedRng, parallelOpts.readsPerBlock) { + expected, expectedOK := oracle.Get(kp.store, []byte(kp.key)) + + sv, sOK, err := src.Get(kp.store, []byte(kp.key)) + require.NoError(t, err, + "%s parallel-block=%d: src.Get store=%q key=%x", + profile.name, blocksDriven, kp.store, []byte(kp.key)) + require.Equal(t, expectedOK, sOK, + "%s parallel-block=%d: src.Get found mismatch store=%q key=%x", + profile.name, blocksDriven, kp.store, []byte(kp.key)) + require.Equal(t, expected, sv, + "%s parallel-block=%d: src.Get value mismatch store=%q key=%x", + profile.name, blocksDriven, kp.store, []byte(kp.key)) + + dv, dOK, err := dst.Get(kp.store, []byte(kp.key)) + require.NoError(t, err, + "%s parallel-block=%d: dst.Get store=%q key=%x", + profile.name, blocksDriven, kp.store, []byte(kp.key)) + require.Equal(t, expectedOK, dOK, + "%s parallel-block=%d: dst.Get found mismatch store=%q key=%x", + profile.name, blocksDriven, kp.store, []byte(kp.key)) + require.Equal(t, expected, dv, + "%s parallel-block=%d: dst.Get value mismatch store=%q key=%x", + profile.name, blocksDriven, kp.store, []byte(kp.key)) + } + + if migrationCompleteFor(src, profile.writeMode) && migrationCompleteFor(dst, profile.writeMode) { + break + } + } + + require.Less(t, blocksDriven, parallelMaxBlocks, + "%s: migration failed to converge within parallelMaxBlocks=%d", + profile.name, parallelMaxBlocks) + require.True(t, migrationCompleteFor(src, profile.writeMode), + "%s: src migration must complete in the parallel phase", profile.name) + require.True(t, migrationCompleteFor(dst, profile.writeMode), + "%s: dst migration must complete in the parallel phase", profile.name) + + // ---- Phase F: end-of-test verification ---- + verifyReadsEqual(t, src, oracle) + verifyReadsEqual(t, dst, oracle) + deepInspectPlacement(t, src, oracle, profile) + deepInspectPlacement(t, dst, oracle, profile) + }) + } +} + +// lookupProfile returns the modeProfile whose Name field equals name. +// Panics on miss; callers pass literal mode names. +func lookupProfile(name string) modeProfile { + for _, p := range allModeProfiles() { + if p.name == name { + return p + } + } + panic("unknown mode profile name: " + name) +} + +// slowKeysPerBlockFor returns a per-mode migration rate small enough to +// keep the target migration mid-flight after preSnapshotBlocks blocks, +// but large enough to converge in the parallel phase against the +// stochastic "new memiavl write rate" produced by the workload (which +// adds to the backlog whenever the new key is ahead of the boundary). +// +// Per-mode reasoning: +// +// - MigrateEVM: only EVM data is in memiavl after Phase A (the +// TestOnlyDualWrite priming). With newKeysPerBlock=100 spread across +// ~20 modules, priming produces only ~75 EVM key writes, and the +// finite EVM address pool then collapses those down to ~50–70 unique +// logical keys. A rate of 10/block keeps the boundary in flight +// across preSnapshotBlocks while still draining in <10 blocks during +// the parallel phase. +// - MigrateBank: bank is the only module in memiavl after the prior +// migration phases complete. Over Phase A + 2 prior phases that is +// hundreds of bank keys, so 25/block stays mid-flight at 2 blocks +// and converges well within the parallel cap. +// - MigrateAllButBank: 18 non-EVM-non-bank modules' worth of data +// remains in memiavl. The slow rate must clear ~18× the new +// ahead-of-boundary write rate per block, hence 100. +func slowKeysPerBlockFor(mode config.WriteMode) int { + switch mode { + case config.MigrateEVM: + return 10 + case config.MigrateBank: + return 25 + case config.MigrateAllButBank: + return 100 + default: + panic("slowKeysPerBlockFor: not an active-migration mode") + } +} + +// priorActiveModes returns the active-migration modes that must be +// completed before opening target. Order is the migration-version order +// the production migration ladder walks. +func priorActiveModes(target config.WriteMode) []string { + switch target { + case config.MigrateEVM: + return nil + case config.MigrateAllButBank: + return []string{"MigrateEVM"} + case config.MigrateBank: + return []string{"MigrateEVM", "MigrateAllButBank"} + default: + panic("priorActiveModes: not an active-migration mode") + } +} + +// targetMigrationVersion returns the on-disk MigrationVersionKey value +// the migration manager writes when the given active-migration mode +// completes. +func targetMigrationVersion(mode config.WriteMode) uint64 { + switch mode { + case config.MigrateEVM: + return migration.Version1_MigrateEVM + case config.MigrateAllButBank: + return migration.Version2_MigrateAllButBank + case config.MigrateBank: + return migration.Version3_FlatKVOnly + default: + panic("targetMigrationVersion: not an active-migration mode") + } +} + +// migrationCompleteFor reports whether cs.flatKV has +// MigrationVersionKey set to the target version of mode. Necessary — +// rather than a plain "version key present" check — because +// completing a prior migration also leaves a version key in flatkv +// (e.g. MigrateEVM writes value=1; subsequent MigrateAllButBank phase +// has its own target of 2). +func migrationCompleteFor(cs *CompositeCommitStore, mode config.WriteMode) bool { + if cs.flatKV == nil { + return false + } + raw, ok := cs.flatKV.Get(migration.MigrationStore, []byte(migration.MigrationVersionKey)) + if !ok || len(raw) != 8 { + return false + } + return binary.BigEndian.Uint64(raw) == targetMigrationVersion(mode) +} + +// migrationBoundaryPresent reports whether MigrationBoundaryKey is +// currently persisted on cs.flatKV. The migration manager writes it on +// the first commit after start and removes it on the final block. Used +// to confirm a snapshot was actually taken mid-migration. +func migrationBoundaryPresent(cs *CompositeCommitStore) bool { + if cs.flatKV == nil { + return false + } + _, ok := cs.flatKV.Get(migration.MigrationStore, []byte(migration.MigrationBoundaryKey)) + return ok +} diff --git a/sei-db/state_db/sc/composite/fuzz_state_sync_test.go b/sei-db/state_db/sc/composite/fuzz_state_sync_test.go new file mode 100644 index 0000000000..511da0c67d --- /dev/null +++ b/sei-db/state_db/sc/composite/fuzz_state_sync_test.go @@ -0,0 +1,98 @@ +package composite + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/common/testutil" +) + +// TestCompositeFuzzStateSyncAllModes drives a randomized state-sync +// round-trip for every config.WriteMode: +// +// 1. populate a source CompositeCommitStore with N blocks of workload; +// 2. open an exporter at the source's current version, drain it; +// 3. open a fresh destination store in the same mode, replay the export +// stream through its importer, and reload it at the source's version; +// 4. verify the destination's Get / Has agree with the oracle for every +// live key (the snapshot transferred a faithful copy); +// 5. resume the same workload schedule against the destination for an +// additional M blocks and verify the destination keeps converging +// with the oracle; +// 6. run end-of-test deep inspection of the destination's nested +// backends. +// +// Active-migration modes drive enough source-side blocks to finish +// migration before exporting, so the destination receives a post-migration +// snapshot. This isolates the round-trip from any mid-migration resume +// behavior, which is exercised separately by +// TestCompositeFuzzStateSyncDuringMigration. +func TestCompositeFuzzStateSyncAllModes(t *testing.T) { + const ( + srcBlocks = 100 + dstBlocks = 30 + ) + + for _, profile := range allModeProfiles() { + t.Run(profile.name, func(t *testing.T) { + rng := testutil.NewTestRandom() + + // ---- Source ---- + srcDir := t.TempDir() + src := newCompositeForMode(t, t.Context(), srcDir, profile) + + oracle := newOracleStore() + keysInUse := newLiveKeySet() + simulateBlocksOnComposite(t, src, oracle, keysInUse, profile, rng, + defaultWorkloadOpts(srcBlocks)) + + require.Equal(t, int64(srcBlocks), src.Version(), + "%s: pre-export src.Version must equal blocks driven", profile.name) + + exporter, err := src.Exporter(int64(srcBlocks)) + require.NoError(t, err, "%s: src.Exporter", profile.name) + items := fuzzDrainExporter(t, exporter) + require.NoError(t, exporter.Close()) + require.NoError(t, src.Close(), "%s: src.Close", profile.name) + + // ---- Destination ---- + dstDir := t.TempDir() + dst := newCompositeForMode(t, t.Context(), dstDir, profile) + // Close prior to opening the importer: existing + // store_test.go tests do the same — the importer claims + // ownership of the underlying backends, so the writable + // handle must be released first. + require.NoError(t, dst.Close(), "%s: dst pre-import Close", profile.name) + + importer, err := dst.Importer(int64(srcBlocks)) + require.NoError(t, err, "%s: dst.Importer", profile.name) + fuzzReplayImport(t, importer, items) + require.NoError(t, importer.Close(), "%s: importer.Close", profile.name) + + _, err = dst.LoadVersion(int64(srcBlocks), false) + require.NoError(t, err, "%s: dst.LoadVersion at imported version", profile.name) + defer func() { _ = dst.Close() }() + + require.Equal(t, int64(srcBlocks), dst.Version(), + "%s: dst.Version after import must equal source version", profile.name) + latest, err := dst.GetLatestVersion() + require.NoError(t, err) + require.Equal(t, int64(srcBlocks), latest, + "%s: dst.GetLatestVersion after import must equal source version", profile.name) + + verifyReadsEqual(t, dst, oracle) + + // ---- Resume workload on destination ---- + opts := defaultWorkloadOpts(dstBlocks) + opts.startingBlock = srcBlocks + 1 + simulateBlocksOnComposite(t, dst, oracle, keysInUse, profile, rng, opts) + + require.Equal(t, int64(srcBlocks+dstBlocks), dst.Version(), + "%s: post-resume dst.Version mismatch", profile.name) + + verifyReadsEqual(t, dst, oracle) + deepInspectPlacement(t, dst, oracle, profile) + }) + } +} diff --git a/sei-db/state_db/sc/composite/fuzz_workload_test.go b/sei-db/state_db/sc/composite/fuzz_workload_test.go new file mode 100644 index 0000000000..ffc85d9087 --- /dev/null +++ b/sei-db/state_db/sc/composite/fuzz_workload_test.go @@ -0,0 +1,532 @@ +package composite + +import ( + "errors" + "sort" + "testing" + + "github.com/stretchr/testify/require" + + errorutils "github.com/sei-protocol/sei-chain/sei-db/common/errors" + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/common/testutil" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/migration" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/types" +) + +// workloadOpts controls the per-block volume of the fuzz workload. +type workloadOpts struct { + readsPerBlock int + updatesPerBlock int + deletesPerBlock int + newKeysPerBlock int + blocks int + // iteratorReadsPerBlock and proofReadsPerBlock are best-effort: + // they are only exercised on stores in profile.iterableStores / + // profile.proofSupportingStores. Set to 0 to disable. + iteratorReadsPerBlock int + proofReadsPerBlock int + // startingBlock is the block number of the first block driven by + // this call (1-based). Used so successive calls to + // simulateBlocksOnComposite against the same composite can assert + // monotonically-increasing versions. + startingBlock int +} + +// defaultWorkloadOpts returns a reasonable starting point for an N-block +// run. Most tests pass blocks=N and accept the defaults for the rest; +// individual fields can be overridden as needed. +func defaultWorkloadOpts(blocks int) workloadOpts { + return workloadOpts{ + readsPerBlock: 50, + updatesPerBlock: 50, + deletesPerBlock: 10, + newKeysPerBlock: 100, + blocks: blocks, + iteratorReadsPerBlock: 3, + proofReadsPerBlock: 3, + startingBlock: 1, + } +} + +// blockOps captures the randomized changes scheduled for a single block, +// pre-applied to anything. Returned by generateBlockOps so the same +// schedule can be replayed against multiple CompositeCommitStores in +// lockstep (see TestCompositeFuzzStateSyncDuringMigration). +// +// addedKeys and removedKeys are the keyPair deltas that must be applied +// to the workload's liveKeySet once the changesets have been applied to +// every store the schedule is being replayed against. +type blockOps struct { + changesets []*proto.NamedChangeSet + addedKeys []keyPair + removedKeys []keyPair +} + +// generateBlockOps generates a single block of randomized work: a mix of +// inserts, updates, and deletes, distributed across profile.initialStores. +// The function does not mutate keysInUse so the caller decides exactly +// when to fold the deltas in (e.g. only after applying to every store in +// a lockstep run). Iteration over the per-store buckets is performed in +// sorted name order so the returned slice is byte-stable for a fixed +// rng seed. +func generateBlockOps(profile modeProfile, keysInUse *liveKeySet, rng *testutil.TestRandom, opts workloadOpts) blockOps { + allPairs := make(map[string][]*proto.KVPair) + var added []keyPair + + for i := 0; i < opts.newKeysPerBlock; i++ { + store := profile.initialStores[rng.Intn(len(profile.initialStores))] + var pair *proto.KVPair + if store == keys.EVMStoreKey { + pair = randomEVMKVPair(rng) + } else { + pair = randomKVPair(rng) + } + allPairs[store] = append(allPairs[store], pair) + added = append(added, keyPair{store: store, key: string(pair.Key)}) + } + + for _, kp := range keysInUse.Sample(rng, opts.updatesPerBlock) { + var value []byte + if kp.store == keys.EVMStoreKey { + value = randomEVMValue(rng, []byte(kp.key)) + } else { + value = rng.Bytes(8) + } + allPairs[kp.store] = append(allPairs[kp.store], + &proto.KVPair{Key: []byte(kp.key), Value: value}) + } + + removed := keysInUse.Sample(rng, opts.deletesPerBlock) + for _, kp := range removed { + allPairs[kp.store] = append(allPairs[kp.store], + &proto.KVPair{Key: []byte(kp.key), Delete: true}) + } + + storeNames := make([]string, 0, len(allPairs)) + for store := range allPairs { + storeNames = append(storeNames, store) + } + sort.Strings(storeNames) + out := make([]*proto.NamedChangeSet, 0, len(allPairs)) + for _, store := range storeNames { + out = append(out, &proto.NamedChangeSet{ + Name: store, + Changeset: proto.ChangeSet{Pairs: allPairs[store]}, + }) + } + return blockOps{changesets: out, addedKeys: added, removedKeys: removed} +} + +// applyBlockOpsTo applies ops's changesets to cs, commits, and returns +// the committed version. Caller is responsible for any per-block read +// sampling and for folding ops.addedKeys / ops.removedKeys into the +// workload's liveKeySet. +func applyBlockOpsTo(t *testing.T, cs *CompositeCommitStore, ops blockOps, expectedVersion int64) { + t.Helper() + require.NoError(t, cs.ApplyChangeSets(ops.changesets), + "block %d: ApplyChangeSets", expectedVersion) + version, err := cs.Commit() + require.NoError(t, err, "block %d: Commit", expectedVersion) + require.Equal(t, expectedVersion, version, + "block %d: Commit must return expected version", expectedVersion) + require.Equal(t, expectedVersion, cs.Version(), + "block %d: cs.Version must agree with Commit's return", expectedVersion) + lci := cs.LastCommitInfo() + require.NotNil(t, lci, "block %d: LastCommitInfo must not be nil", expectedVersion) + require.Equal(t, expectedVersion, lci.Version, + "block %d: LastCommitInfo.Version must agree with cs.Version", expectedVersion) +} + +// simulateBlocksOnComposite drives a randomized workload against cs and +// mirrors every write to oracle so the two stay in lockstep. After every +// block the helper: +// +// - asserts cs.Commit() returned the expected (== block-number) +// version and that cs.LastCommitInfo().Version agrees with cs.Version(); +// - samples reads through cs.Get / cs.Has and verifies they match the +// oracle for the same keys; +// - additionally samples Iterator / GetProof on stores in the per-mode +// capability sets when the corresponding *PerBlock count is > 0. +// +// All randomness is sourced from rng, so the same seed produces the +// byte-identical apply / commit sequence across runs. +func simulateBlocksOnComposite( + t *testing.T, + cs *CompositeCommitStore, + oracle *oracleStore, + keysInUse *liveKeySet, + profile modeProfile, + rng *testutil.TestRandom, + opts workloadOpts, +) { + t.Helper() + + iterableList := setToSortedSlice(profile.iterableStores) + proofList := setToSortedSlice(profile.proofSupportingStores) + + for b := 0; b < opts.blocks; b++ { + blockNumber := int64(opts.startingBlock + b) + ops := generateBlockOps(profile, keysInUse, rng, opts) + + applyBlockOpsTo(t, cs, ops, blockNumber) + oracle.Apply(ops.changesets) + for _, kp := range ops.addedKeys { + keysInUse.Add(kp) + } + for _, kp := range ops.removedKeys { + keysInUse.Remove(kp) + } + + // 4) Per-block read sample via Get / Has. Verifies oracle + // equivalence on a small set of live keys each block. + for _, kp := range keysInUse.Sample(rng, opts.readsPerBlock) { + expected, expectedOK := oracle.Get(kp.store, []byte(kp.key)) + gotVal, gotOK, err := cs.Get(kp.store, []byte(kp.key)) + require.NoError(t, err, "block %d: cs.Get store=%q", blockNumber, kp.store) + require.Equal(t, expectedOK, gotOK, + "block %d: cs.Get found mismatch on store=%q key=%x", blockNumber, kp.store, []byte(kp.key)) + require.Equal(t, expected, gotVal, + "block %d: cs.Get value mismatch on store=%q key=%x", blockNumber, kp.store, []byte(kp.key)) + + hasOK, err := cs.Has(kp.store, []byte(kp.key)) + require.NoError(t, err, "block %d: cs.Has store=%q", blockNumber, kp.store) + require.Equal(t, gotOK, hasOK, + "block %d: cs.Has must agree with cs.Get on store=%q key=%x", + blockNumber, kp.store, []byte(kp.key)) + } + + // 5) Per-block Iterator sample on stores that route to a backend + // that supports iteration (typically memiavl). The dbm.Iterator + // contract demands non-nil start/end; we use the empty-then-end + // pair to scan the whole tree. + if opts.iteratorReadsPerBlock > 0 && len(iterableList) > 0 { + for i := 0; i < opts.iteratorReadsPerBlock; i++ { + store := iterableList[rng.Intn(len(iterableList))] + iter, err := cs.Iterator(store, []byte{0x00}, []byte{0xFF}, true) + require.NoError(t, err, "block %d: cs.Iterator store=%q", blockNumber, store) + require.NotNil(t, iter, "block %d: cs.Iterator returned nil iterator for store=%q", + blockNumber, store) + // Walk a couple of entries to make sure the iterator + // is functional; deeper iteration is exercised by the + // end-of-test placement walk. + for j := 0; j < 3 && iter.Valid(); j++ { + iter.Next() + } + require.NoError(t, iter.Error(), "block %d: iterator error store=%q", blockNumber, store) + require.NoError(t, iter.Close()) + } + } + + // 6) Per-block GetProof sample. Only meaningful for memiavl-routed + // stores; verifies the proof builder does not error and returns a + // non-nil proof for keys that exist. + if opts.proofReadsPerBlock > 0 && len(proofList) > 0 { + for i := 0; i < opts.proofReadsPerBlock; i++ { + // Pick a random live key in a proof-supporting store. If + // the sampled key is not in such a store, skip this slot. + sample := keysInUse.Sample(rng, 1) + if len(sample) == 0 { + continue + } + kp := sample[0] + if !profile.proofSupportingStores[kp.store] { + continue + } + proof, err := cs.GetProof(kp.store, []byte(kp.key)) + require.NoError(t, err, + "block %d: cs.GetProof store=%q key=%x", blockNumber, kp.store, []byte(kp.key)) + require.NotNil(t, proof, + "block %d: cs.GetProof returned nil proof for store=%q key=%x", + blockNumber, kp.store, []byte(kp.key)) + } + } + } +} + +// verifyReadsEqual asserts cs.Get and cs.Has agree with oracle for every +// live oracle key. Caller-owned cs and oracle must reflect the same +// committed state; the routine performs no commits of its own. +func verifyReadsEqual(t *testing.T, cs *CompositeCommitStore, oracle *oracleStore) { + t.Helper() + for storeName, storeMap := range oracle.stores { + for k, expected := range storeMap { + key := []byte(k) + gotVal, gotOK, err := cs.Get(storeName, key) + require.NoError(t, err, "cs.Get store=%q key=%x", storeName, key) + require.True(t, gotOK, + "cs.Get not found for store=%q key=%x (oracle has it)", storeName, key) + require.Equal(t, expected, gotVal, + "cs.Get value mismatch on store=%q key=%x", storeName, key) + hasOK, err := cs.Has(storeName, key) + require.NoError(t, err, "cs.Has store=%q key=%x", storeName, key) + require.True(t, hasOK, + "cs.Has not found for store=%q key=%x", storeName, key) + } + } +} + +// flatKVPhysicalKeyCount returns the count of physical rows present in +// cs.flatKV across every data DB. Pending (uncommitted) writes are not +// included. Panics if cs.flatKV is nil; callers must guard. +// +// "Physical" means raw DB rows in flatkv's underlying pebbledbs. flatkv +// merges nonce + codeHash for the same address into one accountDB row, +// so this can underreport vs. a naive "count of logical keys" view. +// Callers compare against oracleFlatkvShapeFor (which models that +// merging) rather than against a flat logical count. +func flatKVPhysicalKeyCount(t *testing.T, cs *CompositeCommitStore) int64 { + t.Helper() + require.NotNil(t, cs.flatKV, "flatKVPhysicalKeyCount called on a mode with no flatkv backend") + iter := cs.flatKV.RawGlobalIterator() + defer func() { _ = iter.Close() }() + var count int64 + for ok := iter.First(); ok; ok = iter.Next() { + count++ + } + require.NoError(t, iter.Error()) + return count +} + +// memIAVLPhysicalKeyCount returns the total number of keys stored across +// every tree in cs.memIAVL. Panics if cs.memIAVL is nil; callers must +// guard. +func memIAVLPhysicalKeyCount(t *testing.T, cs *CompositeCommitStore) int64 { + t.Helper() + require.NotNil(t, cs.memIAVL, "memIAVLPhysicalKeyCount called on a mode with no memiavl backend") + var total int64 + for _, namedTree := range cs.memIAVL.GetDB().Trees() { + iter := namedTree.Iterator(nil, nil, true) + for ; iter.Valid(); iter.Next() { + total++ + } + require.NoError(t, iter.Error(), "memiavl tree %q iterator error", namedTree.Name) + _ = iter.Close() + } + return total +} + +// deepInspectPlacement performs end-of-test verification of the nested +// memiavl + flatkv contents against the oracle, using profile.finalPlacement +// to decide which backend each oracle key should live in. +// +// Per-key invariants: +// +// - For every oracle (store, key, value) where the placement is +// backendMemiavl: the key must be present in cs.memIAVL with the +// oracle's value and (when flatkv exists) absent from cs.flatKV. +// - For backendFlatKV: present in cs.flatKV with the oracle's value and +// (when memiavl exists) absent from cs.memIAVL. +// - For backendDualWriteEVM (TestOnlyDualWrite EVM keys): present in +// both backends with the oracle's value. +// +// Physical-count invariants (no phantom rows): +// +// - cs.memIAVL physical key count == oracle logical keys assigned to +// backendMemiavl plus oracle logical keys assigned to backendDualWriteEVM +// (the latter is the mirror copy that dual-write keeps in memiavl). +// - cs.flatKV physical row count == sum over flatkv-backed stores of the +// per-DB row counts oracleFlatkvShapeFor models (account merging, +// storage tuple uniqueness, code per-address, legacy per-key), plus +// 1 if MigrationVersionKey is on disk. +// +// For active-migration modes additionally asserts MigrationVersionKey is +// present on flatkv and MigrationBoundaryKey is absent — i.e. migration +// completed during the test. +func deepInspectPlacement(t *testing.T, cs *CompositeCommitStore, oracle *oracleStore, profile modeProfile) { + t.Helper() + + memMem := int64(0) + dualEVM := int64(0) + + for storeName, storeMap := range oracle.stores { + placement, ok := profile.finalPlacement[storeName] + require.True(t, ok, + "deepInspectPlacement: oracle contains store %q with no entry in profile.finalPlacement", + storeName) + for k, expected := range storeMap { + key := []byte(k) + switch placement { + case backendMemiavl: + memMem++ + assertMemiavlHas(t, cs, storeName, key, expected) + if profile.hasFlatKV { + assertFlatkvAbsent(t, cs, storeName, key) + } + case backendFlatKV: + assertFlatkvHas(t, cs, storeName, key, expected) + if profile.hasMemiavl { + assertMemiavlAbsent(t, cs, storeName, key) + } + case backendDualWriteEVM: + dualEVM++ + assertMemiavlHas(t, cs, storeName, key, expected) + assertFlatkvHas(t, cs, storeName, key, expected) + default: + t.Fatalf("deepInspectPlacement: unknown backendID %v for store %q", placement, storeName) + } + } + } + + // Migration-completion / no-completion checks. + hasVersionKey := false + if profile.hasFlatKV { + _, hasVersionKey = cs.flatKV.Get(migration.MigrationStore, []byte(migration.MigrationVersionKey)) + _, hasBoundary := cs.flatKV.Get(migration.MigrationStore, []byte(migration.MigrationBoundaryKey)) + if profile.isActiveMigration { + require.True(t, hasVersionKey, + "%s: MigrationVersionKey must be present on flatkv after migration completes", + profile.name) + require.False(t, hasBoundary, + "%s: MigrationBoundaryKey must be absent on flatkv after migration completes", + profile.name) + } else { + // Steady-state-only routers never write migration metadata. + // FlatKVOnly is one of those modes; so are MemiavlOnly, + // EVMMigrated, AllMigratedButBank and TestOnlyDualWrite. + // In our fresh-start fuzz tests there is no upstream + // migration to inherit metadata from, so the version and + // boundary keys must both be absent. + require.False(t, hasVersionKey, + "%s: MigrationVersionKey must be absent (steady-state router does not write it)", + profile.name) + require.False(t, hasBoundary, + "%s: MigrationBoundaryKey must be absent (steady-state router does not write it)", + profile.name) + } + } + + // Physical-count invariants. + if profile.hasMemiavl { + expected := memMem + dualEVM + got := memIAVLPhysicalKeyCount(t, cs) + require.Equal(t, expected, got, + "%s: memiavl physical key count mismatch (expected oracle-derived %d, got %d)", + profile.name, expected, got) + } + if profile.hasFlatKV { + shape := oracleFlatkvShapeFor(oracle, func(store string) bool { + placement := profile.finalPlacement[store] + return placement == backendFlatKV || placement == backendDualWriteEVM + }) + extra := int64(0) + if hasVersionKey { + extra = 1 + } + expected := shape.total() + extra + got := flatKVPhysicalKeyCount(t, cs) + require.Equal(t, expected, got, + "%s: flatkv physical row count mismatch (expected oracle-derived %d "+ + "[account=%d code=%d storage=%d legacy=%d +%d migration]; got %d)", + profile.name, expected, + shape.accountRows, shape.codeRows, shape.storageRows, shape.legacyRows, + extra, got) + } +} + +// assertMemiavlHas verifies a key/value exists in cs.memIAVL. +func assertMemiavlHas(t *testing.T, cs *CompositeCommitStore, store string, key, expected []byte) { + t.Helper() + require.NotNil(t, cs.memIAVL, "assertMemiavlHas called with nil memiavl") + child := cs.memIAVL.GetChildStoreByName(store) + require.NotNil(t, child, "memiavl child store missing for %q", store) + got := child.Get(key) + require.NotNil(t, got, "memiavl missing key store=%q key=%x", store, key) + require.Equal(t, expected, got, "memiavl value mismatch store=%q key=%x", store, key) +} + +// assertMemiavlAbsent verifies a key is not present in cs.memIAVL. +func assertMemiavlAbsent(t *testing.T, cs *CompositeCommitStore, store string, key []byte) { + t.Helper() + require.NotNil(t, cs.memIAVL, "assertMemiavlAbsent called with nil memiavl") + child := cs.memIAVL.GetChildStoreByName(store) + if child == nil { + // Tree might not exist on memiavl in this mode (e.g. a store + // that was created on flatkv only). Treat as absent. + return + } + got := child.Get(key) + require.Nil(t, got, "memiavl unexpectedly has key store=%q key=%x", store, key) +} + +// assertFlatkvHas verifies a key/value exists in cs.flatKV. +func assertFlatkvHas(t *testing.T, cs *CompositeCommitStore, store string, key, expected []byte) { + t.Helper() + require.NotNil(t, cs.flatKV, "assertFlatkvHas called with nil flatkv") + got, ok := cs.flatKV.Get(store, key) + require.True(t, ok, "flatkv missing key store=%q key=%x", store, key) + require.Equal(t, expected, got, "flatkv value mismatch store=%q key=%x", store, key) +} + +// assertFlatkvAbsent verifies a key is not present in cs.flatKV. +func assertFlatkvAbsent(t *testing.T, cs *CompositeCommitStore, store string, key []byte) { + t.Helper() + require.NotNil(t, cs.flatKV, "assertFlatkvAbsent called with nil flatkv") + _, ok := cs.flatKV.Get(store, key) + require.False(t, ok, "flatkv unexpectedly has key store=%q key=%x", store, key) +} + +// setToSortedSlice returns a deterministic sorted slice of a string set's +// keys. Iteration order of Go maps is not stable, so the workload uses +// this helper to keep its rng consumption reproducible. +func setToSortedSlice(s map[string]bool) []string { + out := make([]string, 0, len(s)) + for k := range s { + out = append(out, k) + } + sort.Strings(out) + return out +} + +// ============================================================================= +// State-sync drain / replay helpers (composite-local copies, intentionally +// renamed to avoid colliding with the same-shape helpers defined in +// store_test.go). +// ============================================================================= + +// fuzzExportItem holds one item produced by a types.Exporter. Exactly one +// of moduleName / node is populated per item. +type fuzzExportItem struct { + moduleName string + node *types.SnapshotNode +} + +// fuzzDrainExporter collects every item the exporter yields in stream +// order, stopping at the first errorutils.ErrorExportDone. Any other +// error fails the test. +func fuzzDrainExporter(t *testing.T, exp types.Exporter) []fuzzExportItem { + t.Helper() + var items []fuzzExportItem + for { + raw, err := exp.Next() + if err != nil { + require.True(t, errors.Is(err, errorutils.ErrorExportDone), + "unexpected exporter error: %v", err) + break + } + switch v := raw.(type) { + case string: + items = append(items, fuzzExportItem{moduleName: v}) + case *types.SnapshotNode: + items = append(items, fuzzExportItem{node: v}) + default: + t.Fatalf("unexpected exporter item type %T", raw) + } + } + return items +} + +// fuzzReplayImport feeds a drained exporter stream into imp in the same +// order. +func fuzzReplayImport(t *testing.T, imp types.Importer, items []fuzzExportItem) { + t.Helper() + for _, it := range items { + if it.moduleName != "" { + require.NoError(t, imp.AddModule(it.moduleName), + "importer AddModule %q", it.moduleName) + } else { + imp.AddNode(it.node) + } + } +}