diff --git a/sei-db/db_engine/litt/README.md b/sei-db/db_engine/litt/README.md index 33a863de69..061203e615 100644 --- a/sei-db/db_engine/litt/README.md +++ b/sei-db/db_engine/litt/README.md @@ -14,8 +14,8 @@ - [Configuration Options](#configuration-options) - [CLI](#littdb-cli) - [Definitions](#definitions) -- [Architecture](docsrchitecture.md) -- [Filesystem Layout](docsilesystem_layout.md) +- [Architecture](docs/architecture.md) +- [Filesystem Layout](docs/filesystem_layout.md) # What is LittDB? @@ -107,10 +107,10 @@ Source: [db.go](db.go) ```go type DB interface { -GetTable(name string) (Table, error) -DropTable(name string) error -Stop() error -Destroy() error + GetTable(name string) (Table, error) + DropTable(name string) error + Stop() error + Destroy() error } ``` @@ -118,27 +118,45 @@ Source: [table.go](table.go) ```go type Table interface { -Name() string -Put(key []byte, value []byte) error -PutBatch(batch []*types.KVPair) error -Get(key []byte) ([]byte, bool, error) -Exists(key []byte) (bool, error) -Flush() error -Size() uint64 -SetTTL(ttl time.Duration) error -SetCacheSize(size uint64) error + Name() string + Put(key []byte, value []byte, secondaryKeys ...*types.SecondaryKey) error + PutBatch(batch []*types.PutRequest) error + Get(key []byte) ([]byte, bool, error) + Exists(key []byte) (bool, error) + Flush() error + Size() uint64 + SetTTL(ttl time.Duration) error + SetCacheSize(size uint64) error } ``` -Source: [kv_pair.go](types/kv_pair.go) +Both primary keys and secondary keys must not exceed 64 KiB (2^16 - 1 bytes). Values may be up to 2^32 bytes. +Source: [put_request.go](types/put_request.go) + +```go +type PutRequest struct { + Key []byte + Value []byte + SecondaryKeys []*SecondaryKey // optional, may be nil +} ``` -type KVPair struct { - Key []byte - Value []byte + +Source: [secondary_key.go](types/secondary_key.go) + +```go +type SecondaryKey struct { + Key []byte // a globally unique key alias + Offset uint32 // byte offset into the parent value + Length uint32 // length of the byte range (Offset+Length <= len(Value)) } ``` +A secondary key is a first-class key that aliases a sub-range (or the whole) of the parent value's +bytes. `Get`, `Exists`, `KeyCount`, and TTL all treat secondary keys identically to primary keys. +Secondary keys share the value's bytes on disk, so each one costs roughly one keymap entry rather +than duplicating value bytes. + ## Getting Started Below is a functional example showing how to use LittDB. @@ -147,17 +165,17 @@ Below is a functional example showing how to use LittDB. // Configure and build the database. config, err := littbuilder.DefaultConfig("path/to/where/data/is/stored") if err != nil { -return err + return err } db, err := config.Build(context.Background()) if err != nil { -return err + return err } myTable, err := db.GetTable("my-table") // this code works if the table is new or if the table already exists if err != nil { -return err + return err } // Write a key-value pair to the table. @@ -166,13 +184,13 @@ value := []byte("this is a value") err = myTable.Put(key, value) if err != nil { -return err + return err } // Flush the data to disk. err = myTable.Flush() if err != nil { -return err + return err } // Congratulations! Your data is now durable on disk. @@ -180,7 +198,7 @@ return err // Read the value back. This works before or after a flush. val, ok, err := myTable.Get(key) if err != nil { -return err + return err } ``` diff --git a/sei-db/db_engine/litt/benchmark/data_tracker.go b/sei-db/db_engine/litt/benchmark/data_tracker.go index e72e8e7478..fa9c965e61 100644 --- a/sei-db/db_engine/litt/benchmark/data_tracker.go +++ b/sei-db/db_engine/litt/benchmark/data_tracker.go @@ -8,6 +8,7 @@ import ( "os" "path" "strings" + "sync" "time" "github.com/sei-protocol/sei-chain/sei-db/common/unit" @@ -94,9 +95,19 @@ type DataTracker struct { // The size of the values in bytes for new cohorts. valueSize uint64 - // This channel has capacity one and initially has one value in it. This value is drained when the DataTracker is - // fully stopped. Other threads can use this to block until the DataTracker is fully stopped. - closedChan chan struct{} + // Closed by Close() to signal the data generator goroutine to drain any pending + // reports from writtenKeyIndicesChan and then exit. This is what makes Close() + // deterministic with respect to ReportWrite: any ReportWrite call that has + // returned before Close() is invoked is guaranteed to be processed by the time + // Close() returns. + shutdownChan chan struct{} + + // Closed by the data generator goroutine when it has fully exited. Used by + // Close() to wait for the goroutine to finish. + doneChan chan struct{} + + // Ensures Close() is idempotent. + closeOnce sync.Once // Used to handle fatal errors in the DataTracker. errorMonitor *util.ErrorMonitor @@ -165,9 +176,6 @@ func NewDataTracker( safetyMargin := time.Duration(config.ReadSafetyMarginMinutes * float64(time.Minute)) safeTTL := ttl - safetyMargin - closedChan := make(chan struct{}, 1) - closedChan <- struct{}{} // Will be drained when the DataTracker is closed. - ctx, cancel := context.WithCancel(ctx) tracker := &DataTracker{ @@ -190,7 +198,8 @@ func NewDataTracker( safeTTL: safeTTL, valueSize: valueSize, generator: NewDataGenerator(config.Seed, config.RandomPoolSize), - closedChan: closedChan, + shutdownChan: make(chan struct{}), + doneChan: make(chan struct{}), errorMonitor: errorMonitor, } @@ -298,12 +307,17 @@ func (t *DataTracker) GetWriteInfo() *WriteInfo { } // ReportWrite is called when a key has been written to the database. This means that the key is now safe to be read. +// Reports submitted before Close() is invoked are guaranteed to be processed by the time Close() returns. +// Once shutdown has been initiated (via Close() or context cancellation), subsequent ReportWrite calls return +// immediately without queuing the report. func (t *DataTracker) ReportWrite(index uint64) { select { case t.writtenKeyIndicesChan <- index: return case <-t.ctx.Done(): return + case <-t.shutdownChan: + return } } @@ -331,11 +345,24 @@ func (t *DataTracker) GetReadInfoWithTimeout(timeout time.Duration) *ReadInfo { } } -// Close stops the key manager's background tasks. +// Close stops the key manager's background tasks. Close is idempotent. +// +// Close performs a graceful shutdown: any ReportWrite call that returned before Close was invoked +// is guaranteed to be processed (and any cohort completion side effects persisted) by the time +// Close returns. After Close starts, additional ReportWrite calls return immediately without +// queuing the report. func (t *DataTracker) Close() { - t.cancel() - t.closedChan <- struct{}{} - <-t.closedChan + t.closeOnce.Do(func() { + // Signal the data generator goroutine to drain pending reports and exit. Without this + // step, cancelling the context first could cause the goroutine to exit before + // processing reports that ReportWrite already enqueued, leaving on-disk cohort + // completion state inconsistent with what the caller has reported. + close(t.shutdownChan) + <-t.doneChan + // Cancel the context to unblock any callers still waiting in + // GetWriteInfo/GetReadInfo/GetReadInfoWithTimeout. + t.cancel() + }) } // dataGenerator is responsible for generating data in the background. @@ -343,7 +370,7 @@ func (t *DataTracker) dataGenerator() { ticker := time.NewTicker(time.Duration(t.config.CohortGCPeriodSeconds * float64(time.Second))) defer func() { ticker.Stop() - <-t.closedChan + close(t.doneChan) }() nextWriteInfo := t.generateNextWriteInfo() @@ -360,6 +387,10 @@ func (t *DataTracker) dataGenerator() { return case <-t.ctx.Done(): return + case <-t.shutdownChan: + // graceful shutdown initiated by Close() + t.drainPendingReports() + return case keyIndex := <-t.writtenKeyIndicesChan: // track keys that have been written so that we can read them in the future t.handleWrittenKey(keyIndex) @@ -381,6 +412,10 @@ func (t *DataTracker) dataGenerator() { return case <-t.ctx.Done(): return + case <-t.shutdownChan: + // graceful shutdown initiated by Close() + t.drainPendingReports() + return case keyIndex := <-t.writtenKeyIndicesChan: // track keys that have been written so that we can read them in the future t.handleWrittenKey(keyIndex) @@ -398,6 +433,26 @@ func (t *DataTracker) dataGenerator() { } } +// drainPendingReports processes any reports currently buffered in writtenKeyIndicesChan. +// Called by the data generator goroutine during graceful shutdown so that any ReportWrite +// call that returned before Close() is fully processed (i.e. cohort completion state +// persisted to disk reflects those reports). +// +// This is a best-effort drain of the channel buffer at the moment the goroutine sees the +// shutdown signal. ReportWrite stops accepting new submissions once shutdownChan is closed +// (and Close itself runs after all caller-side ReportWrite calls have returned), so by the +// time we get here the buffer is bounded and no new items can be enqueued. +func (t *DataTracker) drainPendingReports() { + for { + select { + case keyIndex := <-t.writtenKeyIndicesChan: + t.handleWrittenKey(keyIndex) + default: + return + } + } +} + // handleWrittenKey handles a key that has been written to the database. func (t *DataTracker) handleWrittenKey(keyIndex uint64) { // Add key index to the set of written keys we are tracking. diff --git a/sei-db/db_engine/litt/dbcache/cached_table.go b/sei-db/db_engine/litt/dbcache/cached_table.go index f434195ff1..935594c02e 100644 --- a/sei-db/db_engine/litt/dbcache/cached_table.go +++ b/sei-db/db_engine/litt/dbcache/cached_table.go @@ -51,22 +51,28 @@ func (c *cachedTable) Name() string { return c.base.Name() } -func (c *cachedTable) Put(key []byte, value []byte) error { - err := c.base.Put(key, value) +func (c *cachedTable) Put(key []byte, value []byte, secondaryKeys ...*types.SecondaryKey) error { + err := c.base.Put(key, value, secondaryKeys...) if err != nil { return fmt.Errorf("failed to put entry into base table: %w", err) } c.writeCache.Put(string(key), value) + for _, sk := range secondaryKeys { + c.writeCache.Put(string(sk.Key), value[sk.Offset:sk.Offset+sk.Length]) + } return nil } -func (c *cachedTable) PutBatch(batch []*types.KVPair) error { +func (c *cachedTable) PutBatch(batch []*types.PutRequest) error { err := c.base.PutBatch(batch) if err != nil { return err } - for _, kv := range batch { - c.writeCache.Put(util.UnsafeBytesToString(kv.Key), kv.Value) + for _, req := range batch { + c.writeCache.Put(util.UnsafeBytesToString(req.Key), req.Value) + for _, sk := range req.SecondaryKeys { + c.writeCache.Put(util.UnsafeBytesToString(sk.Key), req.Value[sk.Offset:sk.Offset+sk.Length]) + } } return nil } diff --git a/sei-db/db_engine/litt/disktable/control_loop_messages.go b/sei-db/db_engine/litt/disktable/control_loop_messages.go index f6d18a9d00..e11152ad14 100644 --- a/sei-db/db_engine/litt/disktable/control_loop_messages.go +++ b/sei-db/db_engine/litt/disktable/control_loop_messages.go @@ -24,7 +24,7 @@ type controlLoopWriteRequest struct { controlLoopMessage // values is a slice of key-value pairs to write. - values []*types.KVPair + values []*types.PutRequest } // controlLoopSetShardingFactorRequest is a request to set the sharding factor that is sent to the control loop. diff --git a/sei-db/db_engine/litt/disktable/disk_table.go b/sei-db/db_engine/litt/disktable/disk_table.go index b58f7bfd1c..e2ee3b595b 100644 --- a/sei-db/db_engine/litt/disktable/disk_table.go +++ b/sei-db/db_engine/litt/disktable/disk_table.go @@ -724,43 +724,86 @@ func (d *DiskTable) CacheAwareGet( return value, true, false, nil } -func (d *DiskTable) Put(key []byte, value []byte) error { - return d.PutBatch([]*types.KVPair{{Key: key, Value: value}}) +func (d *DiskTable) Put(key []byte, value []byte, secondaryKeys ...*types.SecondaryKey) error { + return d.PutBatch([]*types.PutRequest{{Key: key, Value: value, SecondaryKeys: secondaryKeys}}) } -func (d *DiskTable) PutBatch(batch []*types.KVPair) error { +func (d *DiskTable) PutBatch(batch []*types.PutRequest) error { if ok, err := d.errorMonitor.IsOk(); !ok { return fmt.Errorf("cannot process PutBatch() request, DB is in panicked state due to error: %w", err) } - if d.metrics != nil { - start := d.clock() - totalSize := uint64(0) - for _, kv := range batch { - totalSize += uint64(len(kv.Value)) - } - defer func() { - end := d.clock() - delta := end.Sub(start) - d.metrics.ReportWriteOperation(d.name, delta, uint64(len(batch)), totalSize) - }() - } + // Per-request key count (primary + secondaries). Pre-computed during validation so we can use + // it both for metrics and for the keyCount.Add() at the end. + totalKeys := int64(0) + totalSize := uint64(0) for _, kv := range batch { - if len(kv.Key) > math.MaxUint32 { - return fmt.Errorf("key is too large, length must not exceed 2^32 bytes: %d bytes", len(kv.Key)) - } - if len(kv.Value) > math.MaxUint32 { - return fmt.Errorf("value is too large, length must not exceed 2^32 bytes: %d bytes", len(kv.Value)) - } if kv.Key == nil { return fmt.Errorf("nil keys are not supported") } if kv.Value == nil { return fmt.Errorf("nil values are not supported") } + if len(kv.Key) > math.MaxUint16 { + return fmt.Errorf("key is too large, length must not exceed 2^16 bytes: %d bytes", len(kv.Key)) + } + if len(kv.Value) > math.MaxUint32 { + return fmt.Errorf("value is too large, length must not exceed 2^32 bytes: %d bytes", len(kv.Value)) + } + // Validate every secondary key in this request, and detect duplicate keys (primary vs + // secondary, secondary vs secondary) within the request. Cross-request collisions remain + // the caller's responsibility, matching existing semantics for primary keys. + seen := make(map[string]struct{}, 1+len(kv.SecondaryKeys)) + seen[util.UnsafeBytesToString(kv.Key)] = struct{}{} + for _, sk := range kv.SecondaryKeys { + if sk == nil { + return fmt.Errorf("nil secondary key is not supported") + } + if sk.Key == nil { + return fmt.Errorf("nil secondary key bytes are not supported") + } + if len(sk.Key) > math.MaxUint16 { + return fmt.Errorf("secondary key is too large, length must not exceed 2^16 bytes: %d bytes", + len(sk.Key)) + } + end := uint64(sk.Offset) + uint64(sk.Length) + if end > uint64(len(kv.Value)) { + return fmt.Errorf( + "secondary key range [%d, %d) exceeds value length %d", sk.Offset, end, len(kv.Value)) + } + skKey := util.UnsafeBytesToString(sk.Key) + if _, dup := seen[skKey]; dup { + return fmt.Errorf("duplicate key %x within PutRequest", sk.Key) + } + seen[skKey] = struct{}{} + } + + totalKeys += int64(1 + len(kv.SecondaryKeys)) + totalSize += uint64(len(kv.Value)) + } + + if d.metrics != nil { + start := d.clock() + defer func() { + end := d.clock() + delta := end.Sub(start) + d.metrics.ReportWriteOperation(d.name, delta, uint64(totalKeys), totalSize) //nolint:gosec // totalKeys non-negative + }() + } + + // All requests validated. Populate the unflushed data cache: each key (primary or secondary) + // is stored under its own key, with secondaries pointing at a zero-copy sub-slice of the parent + // value. This makes Get/Exists/CacheAwareGet treat secondaries identically to primaries before + // the data is durable. + for _, kv := range batch { d.unflushedDataCache.Store(util.UnsafeBytesToString(kv.Key), kv.Value) + for _, sk := range kv.SecondaryKeys { + d.unflushedDataCache.Store( + util.UnsafeBytesToString(sk.Key), + kv.Value[sk.Offset:sk.Offset+sk.Length]) + } } request := &controlLoopWriteRequest{ @@ -771,7 +814,7 @@ func (d *DiskTable) PutBatch(batch []*types.KVPair) error { return fmt.Errorf("failed to send write request: %w", err) } - d.keyCount.Add(int64(len(batch))) + d.keyCount.Add(totalKeys) return nil } diff --git a/sei-db/db_engine/litt/disktable/disk_table_secondary_keys_test.go b/sei-db/db_engine/litt/disktable/disk_table_secondary_keys_test.go new file mode 100644 index 0000000000..614fdc8bbc --- /dev/null +++ b/sei-db/db_engine/litt/disktable/disk_table_secondary_keys_test.go @@ -0,0 +1,543 @@ +package disktable + +import ( + "fmt" + "log/slog" + "os" + "path" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt/disktable/keymap" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt/disktable/segment" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt/types" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt/util" + "github.com/stretchr/testify/require" +) + +// buildOneShardMemKeyDiskTable creates a disk table with sharding factor 1 so every value lands in +// the same value file. This makes torn-write recovery tests deterministic — we know exactly which +// file to truncate. +func buildOneShardMemKeyDiskTable( + clock func() time.Time, + name string, + paths []string, +) (litt.ManagedTable, error) { + logger := slog.Default() + keymapPath := filepath.Join(paths[0], keymap.KeymapDirectoryName) + keymapTypeFile, err := setupKeymapTypeFile(keymapPath, keymap.MemKeymapType) + if err != nil { + return nil, fmt.Errorf("failed to load keymap type file: %w", err) + } + keys, _, err := keymap.NewMemKeymap(logger, "", true) + if err != nil { + return nil, fmt.Errorf("failed to create keymap: %w", err) + } + config, err := litt.DefaultConfig(paths...) + if err != nil { + return nil, fmt.Errorf("failed to create config: %w", err) + } + config.Clock = clock + config.GCPeriod = time.Millisecond + config.Fsync = false + config.Logger = logger + config.ShardingFactor = 1 + // Pick a target file size large enough that several Puts can co-exist in one segment without + // rotation; the recovery test specifically wants the torn group to share a segment with the + // surviving groups so the all-or-nothing behavior is observable. + config.TargetSegmentFileSize = 1 << 20 + + table, err := NewDiskTable( + config, + name, + keys, + keymapPath, + keymapTypeFile, + paths, + true, + nil, + ) + if err != nil { + return nil, fmt.Errorf("failed to create disk table: %w", err) + } + return table, nil +} + +// This file collects tests specific to the secondary-key API of DiskTable: +// +// * basic reads/exists/key-count semantics for secondaries, both before and after flush, +// * input validation in PutBatch (oversized / nil / out-of-range / duplicate), +// * aliasing the whole value to another key, +// * TTL/GC reaping the primary and its secondaries together, +// * end-to-end recovery proving that a torn final Put loses every key in its group while every +// completed group survives. +// +// The validation, aliasing, KeyCount, restart and recovery tests run against every disk-table +// implementation in tableBuilders. The cached-write-cache test only makes sense for the cached +// variants, so the test bodies skip the other implementations. + +// putBatchSingle is a tiny helper to PutBatch a single PutRequest, which is otherwise verbose. +func putBatchSingle(t *testing.T, table litt.ManagedTable, req *types.PutRequest) { + t.Helper() + require.NoError(t, table.PutBatch([]*types.PutRequest{req})) +} + +// TestSecondaryKeyReadsBeforeAndAfterFlush proves that a secondary key behaves like any other key +// at every read-side boundary. The same Get/Exists call works pre-flush (served from the +// unflushed data cache) and post-flush (served from the keymap + segment Read). +func TestSecondaryKeyReadsBeforeAndAfterFlush(t *testing.T) { + t.Parallel() + for _, tb := range tableBuilders { + tb := tb + t.Run(tb.name, func(t *testing.T) { + t.Parallel() + rand := util.NewTestRandom() + directory := t.TempDir() + tableName := rand.String(8) + table, err := tb.builder(time.Now, tableName, []string{directory}) + require.NoError(t, err) + + value := []byte("the quick brown fox jumps over the lazy dog") + primary := []byte("primary") + sk1 := &types.SecondaryKey{Key: []byte("quick"), Offset: 4, Length: 5} + sk2 := &types.SecondaryKey{Key: []byte("brown"), Offset: 10, Length: 5} + sk3 := &types.SecondaryKey{Key: []byte("alias"), Offset: 0, Length: uint32(len(value))} + + require.NoError(t, table.Put(primary, value, sk1, sk2, sk3)) + + verify := func(stage string) { + t.Helper() + got, ok, err := table.Get(primary) + require.NoError(t, err, stage) + require.True(t, ok, stage) + require.Equal(t, value, got, stage) + + for _, sk := range []*types.SecondaryKey{sk1, sk2, sk3} { + ok, err := table.Exists(sk.Key) + require.NoError(t, err, stage) + require.True(t, ok, stage) + + got, ok, err := table.Get(sk.Key) + require.NoError(t, err, stage) + require.True(t, ok, stage) + require.Equal(t, value[sk.Offset:sk.Offset+sk.Length], got, stage) + } + + require.EqualValues(t, 4, table.KeyCount(), stage) + } + + verify("before flush") + require.NoError(t, table.Flush()) + verify("after flush") + + require.NoError(t, table.Destroy()) + }) + } +} + +// TestSecondaryKeyValidationErrors verifies that every documented validation rule is enforced and +// that a rejected Put leaves no observable side-effect (KeyCount unchanged). +func TestSecondaryKeyValidationErrors(t *testing.T) { + t.Parallel() + for _, tb := range tableBuilders { + tb := tb + t.Run(tb.name, func(t *testing.T) { + t.Parallel() + rand := util.NewTestRandom() + directory := t.TempDir() + tableName := rand.String(8) + table, err := tb.builder(time.Now, tableName, []string{directory}) + require.NoError(t, err) + + require.Zero(t, table.KeyCount()) + + value := []byte("hello world") + + // Offset+Length exceeds the value. + err = table.Put([]byte("p1"), value, &types.SecondaryKey{Key: []byte("s1"), Offset: 6, Length: 100}) + require.Error(t, err) + + // nil secondary key bytes. + err = table.Put([]byte("p2"), value, &types.SecondaryKey{Key: nil, Offset: 0, Length: 1}) + require.Error(t, err) + + // secondary key collides with the primary. + err = table.Put([]byte("p3"), value, &types.SecondaryKey{Key: []byte("p3"), Offset: 0, Length: 1}) + require.Error(t, err) + + // two secondaries collide with each other in the same Put. + err = table.Put([]byte("p4"), value, + &types.SecondaryKey{Key: []byte("dup"), Offset: 0, Length: 1}, + &types.SecondaryKey{Key: []byte("dup"), Offset: 1, Length: 1}, + ) + require.Error(t, err) + + // primary key too long. + oversized := make([]byte, 1<<16) + err = table.Put(oversized, value) + require.Error(t, err) + + // secondary key too long. + err = table.Put([]byte("p5"), value, &types.SecondaryKey{Key: oversized, Offset: 0, Length: 1}) + require.Error(t, err) + + // No successful Put happened, so the table must report zero keys. + require.Zero(t, table.KeyCount()) + + require.NoError(t, table.Destroy()) + }) + } +} + +// TestSecondaryKeyAliasing covers the alias-the-whole-value pattern: Put(K, V, {A, 0, len(V)}) → +// Get(K) and Get(A) both return V with KeyCount==2. +func TestSecondaryKeyAliasing(t *testing.T) { + t.Parallel() + for _, tb := range tableBuilders { + tb := tb + t.Run(tb.name, func(t *testing.T) { + t.Parallel() + rand := util.NewTestRandom() + directory := t.TempDir() + tableName := rand.String(8) + table, err := tb.builder(time.Now, tableName, []string{directory}) + require.NoError(t, err) + + primary := []byte("primary") + alias := []byte("alias") + value := []byte("payload") + require.NoError(t, table.Put(primary, value, + &types.SecondaryKey{Key: alias, Offset: 0, Length: uint32(len(value))})) + require.NoError(t, table.Flush()) + + got, ok, err := table.Get(primary) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, value, got) + + got, ok, err = table.Get(alias) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, value, got) + + require.EqualValues(t, 2, table.KeyCount()) + + require.NoError(t, table.Destroy()) + }) + } +} + +// TestSecondaryKeyTTLGroupExpiration verifies that a primary and all of its secondaries expire +// together: once the TTL window passes for the primary, every secondary becomes unreachable on +// the same GC pass. The buildMemKeyDiskTableSingleShard test config uses TargetSegmentFileSize=100, +// so writing a few hundred bytes of cycler data is enough to rotate past the segment that holds +// our group; once the old segment is sealed and its lastValueTimestamp is older than the TTL, GC +// reaps it. +func TestSecondaryKeyTTLGroupExpiration(t *testing.T) { + t.Parallel() + + rand := util.NewTestRandom() + directory := t.TempDir() + + startTime := rand.Time() + var fakeTime atomic.Pointer[time.Time] + fakeTime.Store(&startTime) + clock := func() time.Time { return *fakeTime.Load() } + + tableName := rand.String(8) + table, err := buildMemKeyDiskTableSingleShard(clock, tableName, []string{directory}) + require.NoError(t, err) + + ttl := 30 * time.Second + require.NoError(t, table.SetTTL(ttl)) + + value := []byte("hello world") + require.NoError(t, table.Put([]byte("primary"), value, + &types.SecondaryKey{Key: []byte("hello"), Offset: 0, Length: 5}, + &types.SecondaryKey{Key: []byte("world"), Offset: 6, Length: 5}, + )) + require.NoError(t, table.Flush()) + require.EqualValues(t, 3, table.KeyCount()) + + // Write enough additional data to push the group's segment past TargetSegmentFileSize (100) + // and force a rotation, so the group's segment becomes sealed and therefore reapable. + for i := 0; i < 50; i++ { + key := []byte(fmt.Sprintf("filler-%03d", i)) + require.NoError(t, table.Put(key, make([]byte, 16))) + } + require.NoError(t, table.Flush()) + + // Advance the clock past the TTL and trigger GC. + advanced := startTime.Add(2 * ttl) + fakeTime.Store(&advanced) + + // One more Put + Flush after the clock advance so the GC pass sees the new lastValueTimestamp + // on any active segment. + require.NoError(t, table.Put([]byte("post-advance"), []byte("trigger"))) + require.NoError(t, table.Flush()) + + require.NoError(t, table.(*DiskTable).RunGC()) + + // Wait for the GC pass to reap the expired segment. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + ok, err := table.Exists([]byte("primary")) + require.NoError(t, err) + if !ok { + break + } + time.Sleep(20 * time.Millisecond) + require.NoError(t, table.(*DiskTable).RunGC()) + } + + // Primary and all secondaries are reaped together. + for _, key := range [][]byte{[]byte("primary"), []byte("hello"), []byte("world")} { + ok, err := table.Exists(key) + require.NoError(t, err) + require.False(t, ok, "expected expired key %q to be gone", key) + } + + require.NoError(t, table.Destroy()) +} + +// restartWithSecondariesTest exercises the table-restart code path with a workload that mixes +// 0-3 secondaries per Put. After restart every primary AND every secondary must still be +// readable. This is the disk-table-level analogue of the existing TestRestart, and pins down the +// keymap-reload behavior for the new ScopedKey.Kind field. +func restartWithSecondariesTest(t *testing.T, tableBuilder *tableBuilder) { + rand := util.NewTestRandom() + directory := t.TempDir() + tableName := rand.String(8) + table, err := tableBuilder.builder(time.Now, tableName, []string{directory}) + require.NoError(t, err) + + // keyToValue holds the expected bytes for each surviving key (primary OR secondary). + keyToValue := make(map[string][]byte) + + iterations := 200 + restartIteration := iterations / 2 + for i := 0; i < iterations; i++ { + if i == restartIteration { + require.NoError(t, table.Close()) + table, err = tableBuilder.builder(time.Now, tableName, []string{directory}) + require.NoError(t, err) + for k, v := range keyToValue { + got, ok, err := table.Get([]byte(k)) + require.NoError(t, err) + require.True(t, ok, "key %q lost across restart", k) + require.Equal(t, v, got) + } + } + + primary := rand.PrintableVariableBytes(16, 32) + value := rand.PrintableVariableBytes(8, 64) + + // 0-3 secondaries; offsets/lengths chosen to span both strict sub-ranges and the whole + // value. + nSecondaries := int(rand.Int32Range(0, 4)) + secondaries := make([]*types.SecondaryKey, 0, nSecondaries) + for s := 0; s < nSecondaries; s++ { + offset := uint32(rand.Int32Range(0, int32(len(value)))) + maxLen := uint32(len(value)) - offset + if maxLen == 0 { + continue + } + length := uint32(rand.Int32Range(1, int32(maxLen+1))) + skKey := rand.PrintableVariableBytes(16, 32) + if _, exists := keyToValue[string(skKey)]; exists { + continue + } + secondaries = append(secondaries, &types.SecondaryKey{Key: skKey, Offset: offset, Length: length}) + } + + require.NoError(t, table.Put(primary, value, secondaries...)) + keyToValue[string(primary)] = value + for _, sk := range secondaries { + keyToValue[string(sk.Key)] = value[sk.Offset : sk.Offset+sk.Length] + } + + if rand.BoolWithProbability(0.1) { + require.NoError(t, table.Flush()) + } + } + + require.NoError(t, table.Flush()) + for k, v := range keyToValue { + got, ok, err := table.Get([]byte(k)) + require.NoError(t, err) + require.True(t, ok, "key %q missing at end of test", k) + require.Equal(t, v, got) + } + + require.NoError(t, table.Destroy()) +} + +func TestRestartWithSecondaries(t *testing.T) { + t.Parallel() + for _, tb := range tableBuilders { + tb := tb + t.Run(tb.name, func(t *testing.T) { + t.Parallel() + restartWithSecondariesTest(t, tb) + }) + } +} + +// TestGroupAtomicRecoveryEndToEnd is the high-level analogue of the segment-level +// TestSealLoadedSegmentGroupAtomicity: a torn final Put loses every key in its group while every +// completed Put survives. We drive this through DiskTable's public API to make sure the group +// atomicity invariant survives the keymap reload that happens at startup. +// +// The test runs only against MemKeyDiskTableSingleShard, where we know the segment layout exactly +// so we can corrupt it deterministically. The recovery contract is the same for the other disk +// table flavors (they share the same segment package) but corrupting a multi-shard layout would +// require figuring out which shard hosted the torn write. +func TestGroupAtomicRecoveryEndToEnd(t *testing.T) { + t.Parallel() + + rand := util.NewTestRandom() + directory := t.TempDir() + tableName := rand.String(8) + + table, err := buildOneShardMemKeyDiskTable(time.Now, tableName, []string{directory}) + require.NoError(t, err) + + // Two completed Puts, then a third Put whose value we will truncate. We Flush after each to + // move the writes through the value file's flushedSize boundary. + require.NoError(t, table.Put([]byte("survivor-1"), []byte("v1"))) + require.NoError(t, table.Flush()) + + require.NoError(t, table.Put([]byte("survivor-primary"), []byte("hello"), + &types.SecondaryKey{Key: []byte("survivor-secondary"), Offset: 0, Length: 5}, + )) + require.NoError(t, table.Flush()) + + require.NoError(t, table.Put([]byte("torn-primary"), []byte("worldwide"), + &types.SecondaryKey{Key: []byte("torn-secondary"), Offset: 0, Length: 5}, + )) + require.NoError(t, table.Flush()) + + require.NoError(t, table.Close()) + + // Find the segment that holds the torn write: it's the highest-indexed segment whose value + // file is non-empty. (Disk table may rotate to a fresh segment after each flush, so the very + // latest metadata may belong to an empty rollover segment.) + segmentDir := findLatestSegmentDir(t, directory, tableName) + require.NotEmpty(t, segmentDir) + segIdx := segmentIndexWithLargestValueFile(t, segmentDir) + + // Truncate the value file so the primary's tail goes missing. The secondary at the front + // would individually fit, but group-atomic recovery drops it anyway. + valPath := path.Join(segmentDir, fmt.Sprintf("%d-0%s", segIdx, segment.ValuesFileExtension)) + data, err := os.ReadFile(valPath) + require.NoError(t, err) + require.GreaterOrEqual(t, len(data), 3) + require.NoError(t, os.WriteFile(valPath, data[:len(data)-3], 0600)) + + // Flip the metadata's sealed byte from 1 back to 0 to simulate a crash before sealing. This + // is what makes LoadSegment run the recovery path on reopen. + metaPath := path.Join(segmentDir, fmt.Sprintf("%d%s", segIdx, segment.MetadataFileExtension)) + mdBytes, err := os.ReadFile(metaPath) + require.NoError(t, err) + require.Equal(t, segment.V3MetadataSize, len(mdBytes)) + mdBytes[segment.V3MetadataSize-1] = 0 + require.NoError(t, os.WriteFile(metaPath, mdBytes, 0600)) + + // Reopen. + table, err = buildOneShardMemKeyDiskTable(time.Now, tableName, []string{directory}) + require.NoError(t, err) + + // Survivors remain. + for _, key := range [][]byte{ + []byte("survivor-1"), + []byte("survivor-primary"), + []byte("survivor-secondary"), + } { + ok, err := table.Exists(key) + require.NoError(t, err) + require.True(t, ok, "expected %q to survive recovery", key) + } + + // Torn group is gone, both primary and secondary. + for _, key := range [][]byte{[]byte("torn-primary"), []byte("torn-secondary")} { + ok, err := table.Exists(key) + require.NoError(t, err) + require.False(t, ok, "expected %q to be discarded by recovery", key) + } + + require.NoError(t, table.Destroy()) +} + +// findLatestSegmentDir locates the segment directory created by the single-shard mem-keymap disk +// table at the given root. The directory layout is +// //segments/, with each segment occupying a triple of files prefixed by its +// segment index. We return the segments directory itself; the test then walks its files to find +// the highest-indexed segment. +func findLatestSegmentDir(t *testing.T, root, tableName string) string { + t.Helper() + segmentsDir := filepath.Join(root, tableName, "segments") + info, err := os.Stat(segmentsDir) + require.NoError(t, err) + require.True(t, info.IsDir()) + return segmentsDir +} + +// segmentIndexWithLargestValueFile walks the segments directory and returns the index of the +// segment whose value file is the largest. The torn write we corrupt in the recovery test always +// lives in the segment with the most value bytes (the one we wrote into most recently); ignoring +// rollover-only segments (which may exist after a Close) makes the test robust to whatever the +// disk table happens to do at shutdown. +func segmentIndexWithLargestValueFile(t *testing.T, segmentsDir string) uint32 { + t.Helper() + entries, err := os.ReadDir(segmentsDir) + require.NoError(t, err) + + var bestIdx uint32 + var bestSize int64 = -1 + for _, e := range entries { + name := e.Name() + const suffix = segment.ValuesFileExtension + if len(name) < len(suffix) || name[len(name)-len(suffix):] != suffix { + continue + } + // value files are named "-.values"; we always corrupt shard 0 so we only + // consider files whose shard portion is "0". + stripped := name[:len(name)-len(suffix)] + dash := -1 + for i := len(stripped) - 1; i >= 0; i-- { + if stripped[i] == '-' { + dash = i + break + } + } + require.GreaterOrEqual(t, dash, 0) + if stripped[dash+1:] != "0" { + continue + } + idx, err := parseUint32(stripped[:dash]) + require.NoError(t, err) + + info, err := e.Info() + require.NoError(t, err) + if info.Size() > bestSize { + bestSize = info.Size() + bestIdx = idx + } + } + require.GreaterOrEqual(t, bestSize, int64(0), "no value files found in %s", segmentsDir) + return bestIdx +} + +func parseUint32(s string) (uint32, error) { + var n uint32 + for _, r := range s { + if r < '0' || r > '9' { + return 0, fmt.Errorf("not a number: %q", s) + } + n = n*10 + uint32(r-'0') + } + return n, nil +} diff --git a/sei-db/db_engine/litt/disktable/disk_table_test.go b/sei-db/db_engine/litt/disktable/disk_table_test.go index 3ab9418ce6..360431f7a1 100644 --- a/sei-db/db_engine/litt/disktable/disk_table_test.go +++ b/sei-db/db_engine/litt/disktable/disk_table_test.go @@ -315,11 +315,11 @@ func restartTest(t *testing.T, tableBuilder *tableBuilder) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -407,11 +407,11 @@ func middleFileMissingTest(t *testing.T, tableBuilder *tableBuilder, typeToDelet require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -529,11 +529,11 @@ func initialFileMissingTest(t *testing.T, tableBuilder *tableBuilder, typeToDele require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -625,11 +625,11 @@ func initialFileMissingTest(t *testing.T, tableBuilder *tableBuilder, typeToDele require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -721,11 +721,11 @@ func lastFileMissingTest(t *testing.T, tableBuilder *tableBuilder, typeToDelete require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -824,11 +824,11 @@ func lastFileMissingTest(t *testing.T, tableBuilder *tableBuilder, typeToDelete require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -919,11 +919,11 @@ func truncatedKeyFileTest(t *testing.T, tableBuilder *tableBuilder) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -1060,11 +1060,11 @@ func truncatedKeyFileTest(t *testing.T, tableBuilder *tableBuilder) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -1149,11 +1149,11 @@ func truncatedValueFileTest(t *testing.T, tableBuilder *tableBuilder) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -1305,11 +1305,11 @@ func truncatedValueFileTest(t *testing.T, tableBuilder *tableBuilder) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -1395,11 +1395,11 @@ func unflushedKeysTest(t *testing.T, tableBuilder *tableBuilder) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -1515,11 +1515,11 @@ func unflushedKeysTest(t *testing.T, tableBuilder *tableBuilder) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -1790,11 +1790,11 @@ func restartWithMultipleStorageDirectoriesTest(t *testing.T, tableBuilder *table require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -1976,11 +1976,11 @@ func changingShardingFactorTest(t *testing.T, tableBuilder *tableBuilder) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -2099,11 +2099,11 @@ func tableSizeTest(t *testing.T, tableBuilder *tableBuilder) { expectedValues[string(key)] = value creationTimes[string(key)] = newTime } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value creationTimes[string(key)] = newTime } diff --git a/sei-db/db_engine/litt/disktable/segment/key_file.go b/sei-db/db_engine/litt/disktable/segment/key_file.go index 7bc4684abe..9ca967943b 100644 --- a/sei-db/db_engine/litt/disktable/segment/key_file.go +++ b/sei-db/db_engine/litt/disktable/segment/key_file.go @@ -169,14 +169,32 @@ func (k *keyFile) atomicSwap(sync bool) error { return nil } +// KeyRecordHeaderSize is the on-disk size of the fixed-width portion of a key-file record that +// precedes the variable-length key bytes: one byte of KeyKind followed by a uint16 key-length prefix. +const KeyRecordHeaderSize = 3 + +// MaxKeyLength is the maximum permitted length of a key in bytes. The key file stores the key +// length as a uint16, which is more than enough headroom for any realistic key. +const MaxKeyLength = 1<<16 - 1 + // write writes a key to the key file. func (k *keyFile) write(scopedKey *types.ScopedKey) error { if k.writer == nil { return fmt.Errorf("key file is sealed") } - // Write the length of the key. - err := binary.Write(k.writer, binary.BigEndian, uint32(len(scopedKey.Key))) //nolint:gosec // key length fits uint32 + if len(scopedKey.Key) > MaxKeyLength { + return fmt.Errorf("key length %d exceeds maximum of %d", len(scopedKey.Key), MaxKeyLength) + } + + // Write the kind byte (1 B). + err := k.writer.WriteByte(byte(scopedKey.Kind)) + if err != nil { + return fmt.Errorf("failed to write kind to key file: %w", err) + } + + // Write the length of the key (2 B, big-endian). + err = binary.Write(k.writer, binary.BigEndian, uint16(len(scopedKey.Key))) //nolint:gosec // bounded above if err != nil { return fmt.Errorf("failed to write key length to key file: %w", err) } @@ -194,7 +212,7 @@ func (k *keyFile) write(scopedKey *types.ScopedKey) error { } k.size += uint64( //nolint:gosec // sizes are non-negative - 4 /* uint32 size of key */ + + KeyRecordHeaderSize + len(scopedKey.Key) + types.AddressSerializedSize) @@ -266,14 +284,13 @@ func (k *keyFile) readKeys() ([]*types.ScopedKey, error) { keys := make([]*types.ScopedKey, 0) index := 0 - for { - // We need at least 4 bytes to read the length of the key. - if index+4 > len(keyBytes) { //nolint:staticcheck // QF1006 - // There are fewer than 4 bytes left in the file. - break - } - keyLength := int(binary.BigEndian.Uint32(keyBytes[index : index+4])) - index += 4 + // We need the fixed-width header (kind + uint16 key length) before we can decide whether the + // next record fits. + for index+KeyRecordHeaderSize <= len(keyBytes) { + kind := types.KeyKind(keyBytes[index]) + index++ + keyLength := int(binary.BigEndian.Uint16(keyBytes[index : index+2])) + index += 2 // We need to read the key, as well as the serialized address (which embeds the shard ID and value size). if index+keyLength+types.AddressSerializedSize > len(keyBytes) { @@ -293,6 +310,7 @@ func (k *keyFile) readKeys() ([]*types.ScopedKey, error) { keys = append(keys, &types.ScopedKey{ Key: key, Address: address, + Kind: kind, }) } diff --git a/sei-db/db_engine/litt/disktable/segment/key_file_test.go b/sei-db/db_engine/litt/disktable/segment/key_file_test.go index 0ed106e98e..5c2e88f519 100644 --- a/sei-db/db_engine/litt/disktable/segment/key_file_test.go +++ b/sei-db/db_engine/litt/disktable/segment/key_file_test.go @@ -19,6 +19,16 @@ func TestReadWriteKeys(t *testing.T) { index := rand.Uint32() + // Cycle through all four KeyKind values so the on-disk record layout is exercised end-to-end. + // Index 0 has the implicit zero-value (KeyKindStandalone), confirming that a ScopedKey with + // no explicit Kind round-trips as a Standalone primary. + kinds := []types.KeyKind{ + types.KeyKindStandalone, + types.KeyKindPrimary, + types.KeyKindSecondary, + types.KeyKindFinalSecondary, + } + keyCount := rand.Int32Range(100, 200) keys := make([]*types.ScopedKey, keyCount) for i := 0; i < int(keyCount); i++ { @@ -29,7 +39,12 @@ func TestReadWriteKeys(t *testing.T) { uint8(rand.Uint32Range(0, 256)), rand.Uint32(), ) - keys[i] = &types.ScopedKey{Key: key, Address: address} + // First record left implicit (Kind defaults to zero/Standalone); the rest rotate. + var kind types.KeyKind + if i != 0 { + kind = kinds[i%len(kinds)] + } + keys[i] = &types.ScopedKey{Key: key, Address: address, Kind: kind} } segmentPath, err := NewSegmentPath(directory, "", "table") diff --git a/sei-db/db_engine/litt/disktable/segment/metadata_file.go b/sei-db/db_engine/litt/disktable/segment/metadata_file.go index fe6d950e0c..1f53836ba3 100644 --- a/sei-db/db_engine/litt/disktable/segment/metadata_file.go +++ b/sei-db/db_engine/litt/disktable/segment/metadata_file.go @@ -22,7 +22,8 @@ const ( // deleted. MetadataSwapExtension = MetadataFileExtension + util.SwapFileExtension - // V3MetadataSize is the size of the metadata file at LatestSegmentVersion (ShardedAddressSegmentVersion). + // V3MetadataSize is the size of the metadata file at the current LatestSegmentVersion (the name + // is kept for backwards compatibility; the metadata layout has not actually changed since v3). // Layout: // - 4 bytes for version // - 1 byte for the sharding factor diff --git a/sei-db/db_engine/litt/disktable/segment/segment.go b/sei-db/db_engine/litt/disktable/segment/segment.go index 06a51a416e..203bd65a0b 100644 --- a/sei-db/db_engine/litt/disktable/segment/segment.go +++ b/sei-db/db_engine/litt/disktable/segment/segment.go @@ -261,40 +261,100 @@ func (s *Segment) SegmentIndex() uint32 { // sealLoadedSegment is responsible for sealing a segment loaded from disk that is not already sealed. // While doing this, it is responsible for making the key file consistent with the values present in the // value files. +// +// Recovery is "group-atomic": every Put that wrote 1+N key file records (one primary + N secondaries) +// is either kept whole or dropped whole. A group is kept iff (1) its closing record +// (KeyKindStandalone for a 0-secondary Put, or KeyKindFinalSecondary for an N>=1 Put) is present in +// the key file, and (2) every address in the group fits within the flushed bytes of its value file. +// Any other state (partial keyfile record, primary without a closing terminator, stray secondary not +// preceded by a primary, value-file truncated mid-group) results in the entire group being discarded. func (s *Segment) sealLoadedSegment(now time.Time) error { scopedKeys, err := s.keys.readKeys() if err != nil { return fmt.Errorf("failed to read keys: %w", err) } - // keys with values that are not present in the value files + // keys belonging to groups that passed both key-file and value-file completeness checks goodKeys := make([]*types.ScopedKey, 0, len(scopedKeys)) - // keys with values that weren't flushed out to the value files before the DB crashed + // keys belonging to groups that were torn (either mid-key-file or mid-value-file) badKeys := make([]*types.ScopedKey, 0, len(scopedKeys)) + // commitGroup applies the all-or-nothing value-file completeness check to a group's keys and + // routes them to goodKeys or badKeys accordingly. A group survives only if every address in it + // is fully present in its shard's value file. + commitGroup := func(group []*types.ScopedKey) { + if len(group) == 0 { + return + } + for _, sk := range group { + shard := sk.Address.ShardID() + end := uint64(sk.Address.Offset()) + uint64(sk.Address.ValueSize()) + if s.shards[shard].Size() < end { + badKeys = append(badKeys, group...) + return + } + } + goodKeys = append(goodKeys, group...) + } + + // Validate shard IDs up front: a shard ID beyond the segment's sharding factor cannot come from + // normal operation, so we treat it as disk corruption and refuse to seal the segment rather + // than risk silently dropping data. for _, scopedKey := range scopedKeys { shard := scopedKey.Address.ShardID() - if int(shard) >= len(s.shards) { - // A shard ID that exceeds the segment's sharding factor cannot be the result of normal - // operation, so treat it as disk corruption and refuse to seal the segment. Recovery here - // would risk silently dropping data; require human intervention instead. return fmt.Errorf( "segment %d has key with shard ID %d outside sharding factor %d: data corruption detected", s.index, shard, len(s.shards)) } + } - requiredValueFileLength := uint64(scopedKey.Address.Offset()) + - 4 /* value size uint32 */ + - uint64(scopedKey.Address.ValueSize()) - - if s.shards[shard].Size() < requiredValueFileLength { - badKeys = append(badKeys, scopedKey) - } else { - goodKeys = append(goodKeys, scopedKey) + // Walk records in order, accumulating a group buffer that we commit on each terminator. + var currentGroup []*types.ScopedKey + for _, scopedKey := range scopedKeys { + switch scopedKey.Kind { + case types.KeyKindStandalone: + // A standalone primary closes its group immediately. Any in-flight group (which would + // indicate a torn primary-with-secondaries write that was followed by a fresh + // standalone) is dropped. + if len(currentGroup) > 0 { + badKeys = append(badKeys, currentGroup...) + currentGroup = nil + } + commitGroup([]*types.ScopedKey{scopedKey}) + case types.KeyKindPrimary: + // Starting a new group. Any in-flight group is torn. + if len(currentGroup) > 0 { + badKeys = append(badKeys, currentGroup...) + currentGroup = nil + } + currentGroup = append(currentGroup, scopedKey) + case types.KeyKindSecondary: + // A secondary that is not preceded by a primary is a stray record (its primary was torn + // off the front of the file or never written). Drop it. Otherwise, accumulate. + if len(currentGroup) == 0 { + badKeys = append(badKeys, scopedKey) + } else { + currentGroup = append(currentGroup, scopedKey) + } + case types.KeyKindFinalSecondary: + if len(currentGroup) == 0 { + badKeys = append(badKeys, scopedKey) + } else { + currentGroup = append(currentGroup, scopedKey) + commitGroup(currentGroup) + currentGroup = nil + } + default: + return fmt.Errorf("segment %d has key file record with unknown kind %d: data corruption detected", + s.index, scopedKey.Kind) } } + // A group that was never closed (the file ended before its FinalSecondary was written) is torn. + if len(currentGroup) > 0 { + badKeys = append(badKeys, currentGroup...) + } if len(badKeys) > 0 { // We have at least one bad key. Rewrite the keyfile with only the good keys. @@ -395,18 +455,25 @@ func (s *Segment) SetNextSegment(nextSegment *Segment) { s.nextSegment = nextSegment } -// Write records a key-value pair in the data segment, returning the maximum size of all shards within this segment. +// Write records a key-value pair (with optional secondary keys) in the data segment, returning the +// running key count and key-file size of the segment. // -// This method does not ensure that the key-value pair is actually written to disk, only that it will eventually be -// written to disk. Flush must be called to ensure that all data previously passed to Write is written to disk. -func (s *Segment) Write(data *types.KVPair) (keyCount uint32, keyFileSize uint64, err error) { +// This method does not ensure that the key-value pair is actually written to disk, only that it will +// eventually be written to disk. Flush must be called to ensure that all data previously passed to +// Write is written to disk. +// +// The primary key and all of its secondary keys are written contiguously to the key file in a single +// "group": the primary first, followed by each secondary in order. The kind tag on the primary +// (KeyKindStandalone vs. KeyKindPrimary) and on the last secondary (KeyKindFinalSecondary) is what +// lets recovery distinguish a fully-written group from a torn write. +func (s *Segment) Write(data *types.PutRequest) (keyCount uint32, keyFileSize uint64, err error) { if s.metadata.sealed { return 0, 0, fmt.Errorf("segment is sealed, cannot write data") } - // Shard assignment is round-robin: each successive call deposits the value into the next shard, wrapping around - // after metadata.shardingFactor calls. This is safe to do without locking because Write is invoked exclusively - // from the disk_table control loop goroutine. + // Shard assignment is round-robin: each successive call deposits the value into the next shard, + // wrapping around after metadata.shardingFactor calls. This is safe to do without locking + // because Write is invoked exclusively from the disk_table control loop goroutine. shard := s.nextShard s.nextShard++ if s.nextShard == s.metadata.shardingFactor { @@ -421,15 +488,45 @@ func (s *Segment) Write(data *types.KVPair) (keyCount uint32, keyFileSize uint64 return 0, 0, fmt.Errorf("value file already contains %d bytes, cannot add a new value", currentSize) } - s.unflushedKeyCount.Add(1) firstByteIndex := uint32(currentSize) + valueLen := uint64(len(data.Value)) + if uint64(firstByteIndex)+valueLen > math.MaxUint32 { + return 0, 0, + fmt.Errorf("value of length %d would push value file past 2^32 bytes (current size %d)", + valueLen, currentSize) + } + + // Validate every secondary key's address fits in uint32 *before* sending anything, so we never + // produce a partial write. + for _, sk := range data.SecondaryKeys { + end := uint64(firstByteIndex) + uint64(sk.Offset) + uint64(sk.Length) + if end > math.MaxUint32 { + return 0, 0, + fmt.Errorf("secondary key range [%d, %d) would exceed 2^32 byte addressable range", + uint64(firstByteIndex)+uint64(sk.Offset), end) + } + } + + n := len(data.SecondaryKeys) + totalKeys := uint32(1 + n) //nolint:gosec // n bounded by caller validation - s.shardSizes[shard] += uint64(len(data.Value)) + 4 /* uint32 length */ + // Determine kind of the primary key based on whether secondaries follow it. + primaryKind := types.KeyKindStandalone + if n > 0 { + primaryKind = types.KeyKindPrimary + } + + // Update accounting before sending so that callers observe consistent state. + s.unflushedKeyCount.Add(int64(totalKeys)) + s.shardSizes[shard] += valueLen if s.shardSizes[shard] > s.maxShardSize { s.maxShardSize = s.shardSizes[shard] } - s.keyCount++ - s.keyFileSize += uint64(len(data.Key)) + 4 /* uint32 length */ + types.AddressSerializedSize + s.keyCount += totalKeys + s.keyFileSize += keyRecordSize(data.Key) + for _, sk := range data.SecondaryKeys { + s.keyFileSize += keyRecordSize(sk.Key) + } // Forward the value to the shard control loop, which asynchronously writes it to the value file. shardRequest := &valueToWrite{ @@ -442,21 +539,45 @@ func (s *Segment) Write(data *types.KVPair) (keyCount uint32, keyFileSize uint64 fmt.Errorf("failed to send value to shard control loop: %v", err) } - // Forward the value to the key and its address file control loop, which asynchronously writes it to the key file. - keyRequest := &types.ScopedKey{ + // Forward the primary key to the key file control loop, which asynchronously writes it to the + // key file. Primary always goes first; recovery relies on this ordering. + primaryRequest := &types.ScopedKey{ Key: data.Key, - Address: types.NewAddress(s.index, firstByteIndex, shard, uint32(len(data.Value))), //nolint:gosec // value len fits uint32 + Address: types.NewAddress(s.index, firstByteIndex, shard, uint32(valueLen)), //nolint:gosec // bounded above + Kind: primaryKind, } - - err = util.Send(s.errorMonitor, s.keyFileChannel, keyRequest) + err = util.Send(s.errorMonitor, s.keyFileChannel, primaryRequest) if err != nil { return 0, 0, fmt.Errorf("failed to send key to key file control loop: %v", err) } + for i, sk := range data.SecondaryKeys { + kind := types.KeyKindSecondary + if i == n-1 { + kind = types.KeyKindFinalSecondary + } + secondaryRequest := &types.ScopedKey{ + Key: sk.Key, + Address: types.NewAddress(s.index, firstByteIndex+sk.Offset, shard, sk.Length), + Kind: kind, + } + err = util.Send(s.errorMonitor, s.keyFileChannel, secondaryRequest) + if err != nil { + return 0, 0, fmt.Errorf("failed to send secondary key to key file control loop: %v", err) + } + } + return s.keyCount, s.keyFileSize, nil } +// keyRecordSize returns the number of bytes a key file record consumes given a key of the supplied +// length. Includes the kind byte (1), the uint16 key-length prefix (2), the key bytes, and the +// fixed-width serialized address. +func keyRecordSize(key []byte) uint64 { + return uint64(KeyRecordHeaderSize) + uint64(len(key)) + uint64(types.AddressSerializedSize) //nolint:gosec // sizes non-negative +} + // GetMaxShardSize returns the maximum size of all shards in this segment. func (s *Segment) GetMaxShardSize() uint64 { return s.maxShardSize @@ -482,7 +603,7 @@ func (s *Segment) Read(key []byte, dataAddress types.Address) ([]byte, error) { return nil, fmt.Errorf("failed to resolve shard for read: %w", err) } - value, err := values.read(dataAddress.Offset()) + value, err := values.read(dataAddress.Offset(), dataAddress.ValueSize()) if err != nil { return nil, fmt.Errorf("failed to read value: %w", err) } diff --git a/sei-db/db_engine/litt/disktable/segment/segment_test.go b/sei-db/db_engine/litt/disktable/segment/segment_test.go index cec1f494e6..8fc86c1e67 100644 --- a/sei-db/db_engine/litt/disktable/segment/segment_test.go +++ b/sei-db/db_engine/litt/disktable/segment/segment_test.go @@ -2,8 +2,10 @@ package segment import ( "bytes" + "fmt" "log/slog" "os" + "path" "sort" "testing" "time" @@ -67,9 +69,9 @@ func TestWriteAndReadSegmentSingleShard(t *testing.T) { value := values[i] expectedValues[string(key)] = value - expectedLargestShardSize += uint64(len(value)) + 4 /* uint32 length */ + expectedLargestShardSize += uint64(len(value)) - _, _, err := seg.Write(&types.KVPair{Key: key, Value: value}) + _, _, err := seg.Write(&types.PutRequest{Key: key, Value: value}) largestShardSize := seg.GetMaxShardSize() require.NoError(t, err) require.Equal(t, expectedLargestShardSize, largestShardSize) @@ -216,10 +218,10 @@ func TestWriteAndReadSegmentMultiShard(t *testing.T) { value := values[i] expectedValues[string(key)] = value - _, _, err := seg.Write(&types.KVPair{Key: key, Value: value}) + _, _, err := seg.Write(&types.PutRequest{Key: key, Value: value}) require.NoError(t, err) largestShardSize := seg.GetMaxShardSize() - require.True(t, largestShardSize >= uint64(len(value)+4)) + require.True(t, largestShardSize >= uint64(len(value))) // Occasionally flush the segment to disk. if rand.BoolWithProbability(0.25) { @@ -374,10 +376,10 @@ func TestWriteAndReadColdShard(t *testing.T) { value := values[i] expectedValues[string(key)] = value - _, _, err := seg.Write(&types.KVPair{Key: key, Value: value}) + _, _, err := seg.Write(&types.PutRequest{Key: key, Value: value}) require.NoError(t, err) largestShardSize := seg.GetMaxShardSize() - require.True(t, largestShardSize >= uint64(len(value)+4)) + require.True(t, largestShardSize >= uint64(len(value))) } // Seal the segment and read all keys and values. @@ -549,7 +551,7 @@ func TestRoundRobinShardAssignment(t *testing.T) { for i := 0; i < valueCount; i++ { key := rand.PrintableVariableBytes(8, 32) value := rand.PrintableVariableBytes(8, 32) - _, _, err := seg.Write(&types.KVPair{Key: key, Value: value}) + _, _, err := seg.Write(&types.PutRequest{Key: key, Value: value}) require.NoError(t, err) flushFn, err := seg.Flush() @@ -580,3 +582,437 @@ func TestRoundRobinShardAssignment(t *testing.T) { "shard %d received %d values, expected %d", s, perShardCounts[s], valuesPerShard) } } + +// writeNoErr is a tiny wrapper that asserts seg.Write succeeded. seg.Write returns three values, so +// we cannot pass its result directly to require.NoError. +func writeNoErr(t *testing.T, seg *Segment, req *types.PutRequest) { + t.Helper() + _, _, err := seg.Write(req) + require.NoError(t, err) +} + +// newSingleShardSegment is a small test helper that creates a fresh single-shard segment for tests +// that need to control on-disk layout exactly. It returns the segment and the segment path so the +// caller can locate the on-disk files after the segment is sealed. +func newSingleShardSegment(t *testing.T) (*Segment, *SegmentPath, uint32) { + t.Helper() + rand := util.NewTestRandom() + logger := slog.Default() + directory := t.TempDir() + index := rand.Uint32() + + segmentPath, err := NewSegmentPath(directory, "", "table") + require.NoError(t, err) + require.NoError(t, segmentPath.MakeDirectories(false)) + + seg, err := CreateSegment( + logger, + util.NewErrorMonitor(t.Context(), logger, nil), + index, + []*SegmentPath{segmentPath}, + false, + 1, + false, + ) + require.NoError(t, err) + return seg, segmentPath, index +} + +// keysByKey indexes a slice of ScopedKey by key bytes for easier lookup. +func keysByKey(keys []*types.ScopedKey) map[string]*types.ScopedKey { + out := make(map[string]*types.ScopedKey, len(keys)) + for _, k := range keys { + out[string(k.Key)] = k + } + return out +} + +// TestSegmentSecondaryKeyAddresses verifies that a Put with a primary plus several secondaries +// produces one ScopedKey per key, that each Address reads back the correct (sub-)range of the +// stored value, that the per-record Kind tags match the group structure, and that a Put with no +// secondaries emits a single Standalone record. +func TestSegmentSecondaryKeyAddresses(t *testing.T) { + t.Parallel() + + value := []byte("the quick brown fox jumps over the lazy dog") + primaryKey := []byte("primary") + // Mix of strict sub-range secondaries and one alias-the-whole-value secondary. + sk1 := &types.SecondaryKey{Key: []byte("quick"), Offset: 4, Length: 5} // "quick" + sk2 := &types.SecondaryKey{Key: []byte("brown"), Offset: 10, Length: 5} // "brown" + sk3 := &types.SecondaryKey{Key: []byte("whole"), Offset: 0, Length: uint32(len(value))} + standaloneKey := []byte("standalone") + standaloneValue := []byte("no-secondaries-here") + + seg, _, _ := newSingleShardSegment(t) + + _, _, err := seg.Write(&types.PutRequest{ + Key: primaryKey, + Value: value, + SecondaryKeys: []*types.SecondaryKey{sk1, sk2, sk3}, + }) + require.NoError(t, err) + + _, _, err = seg.Write(&types.PutRequest{Key: standaloneKey, Value: standaloneValue}) + require.NoError(t, err) + + flushedKeys, err := seg.Seal(time.Now()) + require.NoError(t, err) + require.Len(t, flushedKeys, 5) + + byKey := keysByKey(flushedKeys) + + // Primary readback. + primary := byKey[string(primaryKey)] + require.NotNil(t, primary) + require.Equal(t, types.KeyKindPrimary, primary.Kind) + got, err := seg.Read(primary.Key, primary.Address) + require.NoError(t, err) + require.Equal(t, value, got) + + // Secondary readback. + for i, sk := range []*types.SecondaryKey{sk1, sk2, sk3} { + entry := byKey[string(sk.Key)] + require.NotNil(t, entry, "secondary %d missing from flushed keys", i) + require.Equal(t, sk.Length, entry.Address.ValueSize()) + got, err := seg.Read(entry.Key, entry.Address) + require.NoError(t, err) + require.Equal(t, value[sk.Offset:sk.Offset+sk.Length], got) + } + + // Kind tagging on the group: middle secondaries are KeyKindSecondary, last is FinalSecondary. + require.Equal(t, types.KeyKindSecondary, byKey["quick"].Kind) + require.Equal(t, types.KeyKindSecondary, byKey["brown"].Kind) + require.Equal(t, types.KeyKindFinalSecondary, byKey["whole"].Kind) + + // Standalone Put: single record tagged KeyKindStandalone. + standalone := byKey[string(standaloneKey)] + require.NotNil(t, standalone) + require.Equal(t, types.KeyKindStandalone, standalone.Kind) + got, err = seg.Read(standalone.Key, standalone.Address) + require.NoError(t, err) + require.Equal(t, standaloneValue, got) +} + +// TestKeyFileKindRoundTrip writes one of each KeyKind through Segment.Write, seals, reloads via +// LoadSegment, and verifies via GetKeys that the on-disk record kinds round-trip exactly. This +// locks in the on-disk byte ordering for the future "last-durable-primary" iteration PR. +func TestKeyFileKindRoundTrip(t *testing.T) { + t.Parallel() + + logger := slog.Default() + seg, segmentPath, index := newSingleShardSegment(t) + + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("standalone"), + Value: []byte("v0"), + }) + + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("p1"), + Value: []byte("hello world"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("hello"), Offset: 0, Length: 5}, + }, + }) + + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("p2"), + Value: []byte("alphabet"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("alpha"), Offset: 0, Length: 5}, + {Key: []byte("bet"), Offset: 5, Length: 3}, + }, + }) + + _, err := seg.Seal(time.Now()) + require.NoError(t, err) + + // Reload from disk and verify the on-disk record kinds. + seg2, err := LoadSegment( + logger, + util.NewErrorMonitor(t.Context(), logger, nil), + index, + []*SegmentPath{segmentPath}, + false, + time.Now(), + false, + ) + require.NoError(t, err) + + keys, err := seg2.GetKeys() + require.NoError(t, err) + require.Len(t, keys, 6) + + // Record order is insertion order within the single key file goroutine. + expected := []struct { + key string + kind types.KeyKind + }{ + {"standalone", types.KeyKindStandalone}, + {"p1", types.KeyKindPrimary}, + {"hello", types.KeyKindFinalSecondary}, + {"p2", types.KeyKindPrimary}, + {"alpha", types.KeyKindSecondary}, + {"bet", types.KeyKindFinalSecondary}, + } + for i, exp := range expected { + require.Equal(t, exp.key, string(keys[i].Key), "record %d key mismatch", i) + require.Equal(t, exp.kind, keys[i].Kind, "record %d kind mismatch (key=%s)", i, exp.key) + } +} + +// markSegmentUnsealed flips the sealed byte on the segment's metadata file from 1 back to 0, +// simulating a segment that crashed before it could write the sealed metadata. We can't use a +// running segment for this because the Seal call is what shuts down the segment's goroutines; the +// pattern is to fully seal, then reach into the file system and corrupt the metadata. +func markSegmentUnsealed(t *testing.T, segmentPath *SegmentPath, index uint32) { + t.Helper() + metaPath := path.Join(segmentPath.SegmentDirectory(), fmt.Sprintf("%d%s", index, MetadataFileExtension)) + data, err := os.ReadFile(metaPath) + require.NoError(t, err) + require.Equal(t, V3MetadataSize, len(data)) + data[V3MetadataSize-1] = 0 + require.NoError(t, os.WriteFile(metaPath, data, 0600)) +} + +// truncateKeyFileBy truncates the segment's key file by `bytes` bytes from the end. +func truncateKeyFileBy(t *testing.T, segmentPath *SegmentPath, index uint32, bytes int) { + t.Helper() + keyPath := path.Join(segmentPath.SegmentDirectory(), fmt.Sprintf("%d%s", index, KeyFileExtension)) + data, err := os.ReadFile(keyPath) + require.NoError(t, err) + require.GreaterOrEqual(t, len(data), bytes) + require.NoError(t, os.WriteFile(keyPath, data[:len(data)-bytes], 0600)) +} + +// truncateValueFileBy truncates the segment's value file for the given shard by `bytes` bytes +// from the end. +func truncateValueFileBy(t *testing.T, segmentPath *SegmentPath, index uint32, shard uint8, bytes int) { + t.Helper() + valPath := path.Join(segmentPath.SegmentDirectory(), fmt.Sprintf("%d-%d%s", index, shard, ValuesFileExtension)) + data, err := os.ReadFile(valPath) + require.NoError(t, err) + require.GreaterOrEqual(t, len(data), bytes) + require.NoError(t, os.WriteFile(valPath, data[:len(data)-bytes], 0600)) +} + +// reloadSegmentExpectingRecovery reloads a segment after corrupting it. Returns the post-recovery +// key list (sorted by insertion order from the key file). +func reloadSegmentExpectingRecovery(t *testing.T, segmentPath *SegmentPath, index uint32) ([]*types.ScopedKey, *Segment) { + t.Helper() + logger := slog.Default() + seg, err := LoadSegment( + logger, + util.NewErrorMonitor(t.Context(), logger, nil), + index, + []*SegmentPath{segmentPath}, + false, + time.Now(), + false, + ) + require.NoError(t, err) + keys, err := seg.GetKeys() + require.NoError(t, err) + return keys, seg +} + +// TestSealLoadedSegmentGroupAtomicity covers all of the torn-write scenarios that +// sealLoadedSegment must handle. Each subtest builds a sealed segment, manually corrupts it on +// disk to simulate a crash mid-write, flips the metadata's sealed bit back to false, then reloads +// and asserts which keys are kept and which are dropped. The "all-or-nothing per group" invariant +// is the property under test. +func TestSealLoadedSegmentGroupAtomicity(t *testing.T) { + t.Parallel() + + // Each test case writes a sequence of PutRequests, then describes how to corrupt the on-disk + // files before recovery. expectedKeys lists the keys (in key-file order) that should survive. + t.Run("clean_standalone_survives", func(t *testing.T) { + t.Parallel() + seg, segmentPath, index := newSingleShardSegment(t) + writeNoErr(t, seg, &types.PutRequest{Key: []byte("k1"), Value: []byte("v1")}) + _, err := seg.Seal(time.Now()) + require.NoError(t, err) + markSegmentUnsealed(t, segmentPath, index) + + keys, _ := reloadSegmentExpectingRecovery(t, segmentPath, index) + require.Len(t, keys, 1) + require.Equal(t, "k1", string(keys[0].Key)) + require.Equal(t, types.KeyKindStandalone, keys[0].Kind) + }) + + t.Run("clean_group_survives", func(t *testing.T) { + t.Parallel() + seg, segmentPath, index := newSingleShardSegment(t) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("p"), + Value: []byte("hello"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("he"), Offset: 0, Length: 2}, + {Key: []byte("llo"), Offset: 2, Length: 3}, + }, + }) + _, err := seg.Seal(time.Now()) + require.NoError(t, err) + markSegmentUnsealed(t, segmentPath, index) + + keys, _ := reloadSegmentExpectingRecovery(t, segmentPath, index) + require.Len(t, keys, 3) + require.Equal(t, types.KeyKindPrimary, keys[0].Kind) + require.Equal(t, types.KeyKindSecondary, keys[1].Kind) + require.Equal(t, types.KeyKindFinalSecondary, keys[2].Kind) + }) + + t.Run("primary_without_terminator_discarded", func(t *testing.T) { + t.Parallel() + // A Put of primary + 2 secondaries with the key file truncated such that only the primary + // record remains. The primary has Kind=KeyKindPrimary but no FinalSecondary closes it, so + // the whole group must be discarded. + seg, segmentPath, index := newSingleShardSegment(t) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("p"), + Value: []byte("hello"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("he"), Offset: 0, Length: 2}, + {Key: []byte("llo"), Offset: 2, Length: 3}, + }, + }) + _, err := seg.Seal(time.Now()) + require.NoError(t, err) + + secondaryRecBytes := int(keyRecordSize([]byte("he")) + keyRecordSize([]byte("llo"))) + truncateKeyFileBy(t, segmentPath, index, secondaryRecBytes) + markSegmentUnsealed(t, segmentPath, index) + + keys, _ := reloadSegmentExpectingRecovery(t, segmentPath, index) + require.Empty(t, keys) + }) + + t.Run("primary_plus_partial_secondaries_discarded", func(t *testing.T) { + t.Parallel() + // Primary + 2 secondaries, key file truncated to drop the FinalSecondary record. Group is + // torn (no closing terminator), discard. + seg, segmentPath, index := newSingleShardSegment(t) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("p"), + Value: []byte("hello"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("he"), Offset: 0, Length: 2}, + {Key: []byte("llo"), Offset: 2, Length: 3}, + }, + }) + _, err := seg.Seal(time.Now()) + require.NoError(t, err) + + truncateKeyFileBy(t, segmentPath, index, int(keyRecordSize([]byte("llo")))) + markSegmentUnsealed(t, segmentPath, index) + + keys, _ := reloadSegmentExpectingRecovery(t, segmentPath, index) + require.Empty(t, keys) + }) + + t.Run("partial_key_record_discarded", func(t *testing.T) { + t.Parallel() + // Truncate the file mid-record (cut into the middle of a key's bytes). readKeys will stop + // at that point and recovery should not commit the in-flight group. + seg, segmentPath, index := newSingleShardSegment(t) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("standalone-kept"), + Value: []byte("v0"), + }) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("torn-primary"), + Value: []byte("hello"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("torn-secondary"), Offset: 0, Length: 5}, + }, + }) + _, err := seg.Seal(time.Now()) + require.NoError(t, err) + + truncateKeyFileBy(t, segmentPath, index, 5) + markSegmentUnsealed(t, segmentPath, index) + + keys, _ := reloadSegmentExpectingRecovery(t, segmentPath, index) + require.Len(t, keys, 1) + require.Equal(t, "standalone-kept", string(keys[0].Key)) + }) + + t.Run("group_discarded_when_value_file_torn", func(t *testing.T) { + t.Parallel() + // Primary + secondaries written; we truncate the value file so the primary's address (the + // one with the largest [offset, offset+len) span) no longer fits. The whole group must drop — + // even though a short secondary at the front of the value would individually fit. + seg, segmentPath, index := newSingleShardSegment(t) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("standalone-kept"), + Value: []byte("survivor"), + }) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("torn-primary"), + Value: []byte("hellooooo"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("he"), Offset: 0, Length: 2}, + {Key: []byte("oo"), Offset: 7, Length: 2}, + }, + }) + _, err := seg.Seal(time.Now()) + require.NoError(t, err) + + truncateValueFileBy(t, segmentPath, index, 0, 3) + markSegmentUnsealed(t, segmentPath, index) + + keys, _ := reloadSegmentExpectingRecovery(t, segmentPath, index) + require.Len(t, keys, 1) + require.Equal(t, "standalone-kept", string(keys[0].Key)) + }) + + t.Run("group_survives_when_value_file_complete", func(t *testing.T) { + t.Parallel() + seg, segmentPath, index := newSingleShardSegment(t) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("p"), + Value: []byte("hellooooo"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("he"), Offset: 0, Length: 2}, + {Key: []byte("oo"), Offset: 7, Length: 2}, + }, + }) + _, err := seg.Seal(time.Now()) + require.NoError(t, err) + markSegmentUnsealed(t, segmentPath, index) + + keys, _ := reloadSegmentExpectingRecovery(t, segmentPath, index) + require.Len(t, keys, 3) + }) + + t.Run("two_clean_groups_plus_torn_third", func(t *testing.T) { + t.Parallel() + seg, segmentPath, index := newSingleShardSegment(t) + writeNoErr(t, seg, &types.PutRequest{Key: []byte("first"), Value: []byte("v1")}) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("second-primary"), + Value: []byte("hi"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("second-secondary"), Offset: 0, Length: 2}, + }, + }) + writeNoErr(t, seg, &types.PutRequest{ + Key: []byte("third-primary"), + Value: []byte("hi"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("third-secondary"), Offset: 0, Length: 2}, + }, + }) + _, err := seg.Seal(time.Now()) + require.NoError(t, err) + + truncateKeyFileBy(t, segmentPath, index, int(keyRecordSize([]byte("third-secondary")))) + markSegmentUnsealed(t, segmentPath, index) + + keys, _ := reloadSegmentExpectingRecovery(t, segmentPath, index) + require.Len(t, keys, 3) + require.Equal(t, "first", string(keys[0].Key)) + require.Equal(t, "second-primary", string(keys[1].Key)) + require.Equal(t, "second-secondary", string(keys[2].Key)) + }) +} diff --git a/sei-db/db_engine/litt/disktable/segment/segment_version.go b/sei-db/db_engine/litt/disktable/segment/segment_version.go index 0ad9652fb6..a2c406e9bf 100644 --- a/sei-db/db_engine/litt/disktable/segment/segment_version.go +++ b/sei-db/db_engine/litt/disktable/segment/segment_version.go @@ -9,12 +9,21 @@ package segment type SegmentVersion uint32 const ( - // ShardedAddressSegmentVersion is the on-disk format that: - // - Replaces the legacy 8-byte address + separate value size in the key file with the 13-byte sharded - // Address layout (index, offset, shardID, valueSize). The keymap stores the same layout. - // - Drops the per-segment hashing salt from the metadata file. Shards are assigned to values in - // round-robin order at write time, which makes the key->shard mapping unpredictable to outside - // callers without needing a hash function or any randomness in the metadata. + // ShardedAddressSegmentVersion is the current on-disk format. It defines: + // - The 13-byte sharded Address layout in the key file (index, offset, shardID, valueSize). The + // keymap stores the same layout. + // - No per-segment hashing salt in the metadata file; shards are assigned to values in round-robin + // order at write time, which makes the key->shard mapping unpredictable to outside callers + // without needing a hash function or any randomness in the metadata. + // - No per-value length prefix in value files. The length lives only in the Address that points + // at the value, which lets secondary keys alias sub-ranges of a value without duplicating data. + // - Per-record `| kind(u8) | keyLen(u16) | key | address(13) |` layout in the key file. The kind + // byte distinguishes primary keys from secondary keys and marks group boundaries used at + // recovery time to discard torn writes atomically. Key length is capped at 64 KiB. + // + // The constant name predates the value-file and key-file changes; it is retained because no + // instance of this codebase has been deployed to production, so there is no compatibility cost to + // folding the new format into the same version number rather than bumping it. ShardedAddressSegmentVersion SegmentVersion = 3 ) diff --git a/sei-db/db_engine/litt/disktable/segment/value_file.go b/sei-db/db_engine/litt/disktable/segment/value_file.go index a23123193d..5cfd031a76 100644 --- a/sei-db/db_engine/litt/disktable/segment/value_file.go +++ b/sei-db/db_engine/litt/disktable/segment/value_file.go @@ -2,7 +2,6 @@ package segment import ( "bufio" - "encoding/binary" "fmt" "io" "log/slog" @@ -192,12 +191,13 @@ func (v *valueFile) path() string { return path.Join(v.segmentPath.SegmentDirectory(), v.name()) } -// read reads a value from the value file. -func (v *valueFile) read(firstByteIndex uint32) ([]byte, error) { +// read reads a length-byte range from the value file. The length is supplied by the caller (it lives in the +// Address that points at this value) so the value file itself stores no length prefix. +func (v *valueFile) read(firstByteIndex uint32, length uint32) ([]byte, error) { flushedSize := v.flushedSize.Load() - if uint64(firstByteIndex) >= flushedSize { - return nil, fmt.Errorf("index %d is out of bounds (current flushed size is %d)", - firstByteIndex, flushedSize) + if uint64(firstByteIndex)+uint64(length) > flushedSize { + return nil, fmt.Errorf("range [%d, %d) is out of bounds (current flushed size is %d)", + firstByteIndex, uint64(firstByteIndex)+uint64(length), flushedSize) } file, err := os.OpenFile(v.path(), os.O_RDONLY, 0600) //nolint:gosec // path validated by segment manager @@ -212,18 +212,12 @@ func (v *valueFile) read(firstByteIndex uint32) ([]byte, error) { }() _, err = file.Seek(int64(firstByteIndex), 0) - reader := bufio.NewReader(file) - - // Read the length of the value. - var length uint32 - err = binary.Read(reader, binary.BigEndian, &length) if err != nil { - return nil, fmt.Errorf("failed to read value length from value file: %v", err) + return nil, fmt.Errorf("failed to seek value file: %v", err) } - // Read the value itself. value := make([]byte, length) - bytesRead, err := io.ReadFull(reader, value) + bytesRead, err := io.ReadFull(file, value) if err != nil { return nil, fmt.Errorf("failed to read value from value file: %v", err) } @@ -236,6 +230,9 @@ func (v *valueFile) read(firstByteIndex uint32) ([]byte, error) { } // write writes a value to the value file, returning the index of the first byte written. +// +// Values are written without a length prefix; the length is recorded in the Address returned by the +// owning segment, which lets secondary keys point at sub-ranges of a value without duplicating data. func (v *valueFile) write(value []byte) (uint32, error) { if v.writer == nil { return 0, fmt.Errorf("value file is sealed") @@ -249,19 +246,12 @@ func (v *valueFile) write(value []byte) (uint32, error) { firstByteIndex := uint32(v.size) - // First, write the length of the value. - err := binary.Write(v.writer, binary.BigEndian, uint32(len(value))) //nolint:gosec // value length fits uint32 - if err != nil { - return 0, fmt.Errorf("failed to write value length to value file: %v", err) - } - - // Then, write the value itself. - _, err = v.writer.Write(value) + _, err := v.writer.Write(value) if err != nil { return 0, fmt.Errorf("failed to write value to value file: %v", err) } - v.size += uint64(len(value) + 4) //nolint:gosec // value length non-negative + v.size += uint64(len(value)) //nolint:gosec // value length non-negative return firstByteIndex, nil } diff --git a/sei-db/db_engine/litt/disktable/segment/value_file_test.go b/sei-db/db_engine/litt/disktable/segment/value_file_test.go index a2668f9697..ff4872cc63 100644 --- a/sei-db/db_engine/litt/disktable/segment/value_file_test.go +++ b/sei-db/db_engine/litt/disktable/segment/value_file_test.go @@ -9,6 +9,14 @@ import ( "github.com/stretchr/testify/require" ) +// valueLocation pairs the (offset, length) of a written value so callers can later read it back. +// The value file no longer stores a length prefix, so callers must remember the length themselves +// (in production code, the length lives in the key file's Address record). +type valueLocation struct { + offset uint32 + length uint32 +} + func TestWriteThenReadValues(t *testing.T) { t.Parallel() rand := util.NewTestRandom() @@ -22,11 +30,11 @@ func TestWriteThenReadValues(t *testing.T) { expectedFileSize := uint64(0) for i := 0; i < int(valueCount); i++ { values[i] = rand.VariableBytes(1, 100) - expectedFileSize += uint64(len(values[i])) + 4 /* length uint32 */ + expectedFileSize += uint64(len(values[i])) } - // A map from the first byte index of the value to the value itself. - addressMap := make(map[uint32][]byte) + // A map from the location of the value to the value itself. + addressMap := make(map[valueLocation][]byte) segmentPath, err := NewSegmentPath(directory, "", "table") require.NoError(t, err) @@ -38,7 +46,8 @@ func TestWriteThenReadValues(t *testing.T) { for _, value := range values { address, err := file.write(value) require.NoError(t, err) - addressMap[address] = value + loc := valueLocation{offset: address, length: uint32(len(value))} //nolint:gosec // bounded + addressMap[loc] = value // Occasionally flush the file to disk. if rand.BoolWithProbability(0.25) { @@ -50,8 +59,8 @@ func TestWriteThenReadValues(t *testing.T) { if rand.BoolWithProbability(0.1) { err = file.flush() require.NoError(t, err) - for key, val := range addressMap { - readValue, err := file.read(key) + for loc, val := range addressMap { + readValue, err := file.read(loc.offset, loc.length) require.NoError(t, err) require.Equal(t, val, readValue) } @@ -61,8 +70,8 @@ func TestWriteThenReadValues(t *testing.T) { // Seal the file and read all values. err = file.seal() require.NoError(t, err) - for key, val := range addressMap { - readValue, err := file.read(key) + for loc, val := range addressMap { + readValue, err := file.read(loc.offset, loc.length) require.NoError(t, err) require.Equal(t, val, readValue) } @@ -72,13 +81,14 @@ func TestWriteThenReadValues(t *testing.T) { require.NoError(t, err) actualFileSize := uint64(stat.Size()) require.Equal(t, actualFileSize, reportedFileSize) + require.Equal(t, expectedFileSize, reportedFileSize) // Create a new in-memory instance from the on-disk file and verify that it behaves the same. file2, err := loadValueFile(logger, index, shard, []*SegmentPath{segmentPath}) require.NoError(t, err) require.Equal(t, file.size, file2.size) - for key, val := range addressMap { - readValue, err := file2.read(key) + for loc, val := range addressMap { + readValue, err := file2.read(loc.offset, loc.length) require.NoError(t, err) require.Equal(t, val, readValue) } @@ -109,8 +119,8 @@ func TestReadingTruncatedValueFile(t *testing.T) { values[i] = rand.VariableBytes(1, 100) } - // A map from the first byte index of the value to the value itself. - addressMap := make(map[uint32][]byte) + // A map from the location of the value to the value itself. + addressMap := make(map[valueLocation][]byte) segmentPath, err := NewSegmentPath(directory, "", "table") require.NoError(t, err) @@ -119,18 +129,21 @@ func TestReadingTruncatedValueFile(t *testing.T) { file, err := createValueFile(logger, index, shard, segmentPath, false) require.NoError(t, err) - var lastAddress uint32 + var lastLoc valueLocation for _, value := range values { address, err := file.write(value) require.NoError(t, err) - addressMap[address] = value - lastAddress = address + loc := valueLocation{offset: address, length: uint32(len(value))} //nolint:gosec // bounded + addressMap[loc] = value + lastLoc = loc } err = file.seal() require.NoError(t, err) - // Truncate the file. Chop off some bytes from the last value, but do not corrupt the length prefix. + // Truncate the file by chopping off some bytes from the end of the last value. Without the + // length prefix in the file, every byte we cut off is value data, so reads of the last value + // must fail and every other value must still read back correctly. lastValueLength := len(values[valueCount-1]) filePath := file.path() @@ -148,34 +161,12 @@ func TestReadingTruncatedValueFile(t *testing.T) { require.NoError(t, err) // We should be able to read all values except for the last one. - for key, val := range addressMap { - if key == lastAddress { - _, err := file.read(key) - require.Error(t, err) - } else { - readValue, err := file.read(key) - require.NoError(t, err) - require.Equal(t, val, readValue) - } - } - - // Truncate the file. Corrupt the length prefix of the last value. - prefixBytesToRemove := rand.Int32Range(1, 4) - bytes = originalBytes[:len(originalBytes)-int(prefixBytesToRemove)] - - err = os.WriteFile(filePath, bytes, 0644) - require.NoError(t, err) - - file, err = loadValueFile(logger, index, shard, []*SegmentPath{segmentPath}) - require.NoError(t, err) - - // We should be able to read all values except for the last one. - for key, val := range addressMap { - if key == lastAddress { - _, err := file.read(key) + for loc, val := range addressMap { + if loc == lastLoc { + _, err := file.read(loc.offset, loc.length) require.Error(t, err) } else { - readValue, err := file.read(key) + readValue, err := file.read(loc.offset, loc.length) require.NoError(t, err) require.Equal(t, val, readValue) } diff --git a/sei-db/db_engine/litt/memtable/mem_table.go b/sei-db/db_engine/litt/memtable/mem_table.go index ff322d55a4..2fbb5f875b 100644 --- a/sei-db/db_engine/litt/memtable/mem_table.go +++ b/sei-db/db_engine/litt/memtable/mem_table.go @@ -92,33 +92,123 @@ func (m *memTable) KeyCount() uint64 { return uint64(len(m.data)) } -func (m *memTable) Put(key []byte, value []byte) error { - stringKey := string(key) - expiration := &expirationRecord{ - creationTime: m.clock(), - key: stringKey, +func (m *memTable) Put(key []byte, value []byte, secondaryKeys ...*types.SecondaryKey) error { + // Validate first so a failed validation never leaves a partial insert behind. + if key == nil { + return fmt.Errorf("nil keys are not supported") + } + if value == nil { + return fmt.Errorf("nil values are not supported") + } + seen := make(map[string]struct{}, 1+len(secondaryKeys)) + seen[string(key)] = struct{}{} + for _, sk := range secondaryKeys { + if sk == nil { + return fmt.Errorf("nil secondary key is not supported") + } + if sk.Key == nil { + return fmt.Errorf("nil secondary key bytes are not supported") + } + end := uint64(sk.Offset) + uint64(sk.Length) + if end > uint64(len(value)) { + return fmt.Errorf( + "secondary key range [%d, %d) exceeds value length %d", sk.Offset, end, len(value)) + } + skKey := string(sk.Key) + if _, dup := seen[skKey]; dup { + return fmt.Errorf("duplicate key %x within Put", sk.Key) + } + seen[skKey] = struct{}{} } + stringKey := string(key) + now := m.clock() + m.lock.Lock() defer m.lock.Unlock() - _, ok := m.data[stringKey] - if ok { + if _, ok := m.data[stringKey]; ok { return fmt.Errorf("key %x already exists", key) } + for _, sk := range secondaryKeys { + if _, ok := m.data[string(sk.Key)]; ok { + return fmt.Errorf("secondary key %x already exists", sk.Key) + } + } + m.data[stringKey] = value - m.expirationQueue.Push(expiration) + m.expirationQueue.Push(&expirationRecord{creationTime: now, key: stringKey}) + for _, sk := range secondaryKeys { + skString := string(sk.Key) + m.data[skString] = value[sk.Offset : sk.Offset+sk.Length] + m.expirationQueue.Push(&expirationRecord{creationTime: now, key: skString}) + } return nil } -func (m *memTable) PutBatch(batch []*types.KVPair) error { - for _, kv := range batch { - err := m.Put(kv.Key, kv.Value) - if err != nil { - return err +func (m *memTable) PutBatch(batch []*types.PutRequest) error { + // Stateless validation pass: matches single-Put validation rules. If any request is + // invalid, the entire batch is rejected before any writes are applied. This mirrors the + // validation-atomic behavior of DiskTable.PutBatch. + for _, req := range batch { + if req.Key == nil { + return fmt.Errorf("nil keys are not supported") + } + if req.Value == nil { + return fmt.Errorf("nil values are not supported") + } + seen := make(map[string]struct{}, 1+len(req.SecondaryKeys)) + seen[string(req.Key)] = struct{}{} + for _, sk := range req.SecondaryKeys { + if sk == nil { + return fmt.Errorf("nil secondary key is not supported") + } + if sk.Key == nil { + return fmt.Errorf("nil secondary key bytes are not supported") + } + end := uint64(sk.Offset) + uint64(sk.Length) + if end > uint64(len(req.Value)) { + return fmt.Errorf( + "secondary key range [%d, %d) exceeds value length %d", sk.Offset, end, len(req.Value)) + } + skKey := string(sk.Key) + if _, dup := seen[skKey]; dup { + return fmt.Errorf("duplicate key %x within PutRequest", sk.Key) + } + seen[skKey] = struct{}{} + } + } + + now := m.clock() + + m.lock.Lock() + defer m.lock.Unlock() + + // Collision pass: ensure no key in any request already exists in the table. Held under the + // same lock as the apply pass, so the batch as a whole succeeds or fails atomically. + for _, req := range batch { + if _, ok := m.data[string(req.Key)]; ok { + return fmt.Errorf("key %x already exists", req.Key) + } + for _, sk := range req.SecondaryKeys { + if _, ok := m.data[string(sk.Key)]; ok { + return fmt.Errorf("secondary key %x already exists", sk.Key) + } } } + + for _, req := range batch { + stringKey := string(req.Key) + m.data[stringKey] = req.Value + m.expirationQueue.Push(&expirationRecord{creationTime: now, key: stringKey}) + for _, sk := range req.SecondaryKeys { + skString := string(sk.Key) + m.data[skString] = req.Value[sk.Offset : sk.Offset+sk.Length] + m.expirationQueue.Push(&expirationRecord{creationTime: now, key: skString}) + } + } + return nil } diff --git a/sei-db/db_engine/litt/table.go b/sei-db/db_engine/litt/table.go index 58c189b77b..4bfa2dc210 100644 --- a/sei-db/db_engine/litt/table.go +++ b/sei-db/db_engine/litt/table.go @@ -22,25 +22,33 @@ type Table interface { // Note that when this method returns, data written may not be crash durable on disk // (although the write does have atomicity). In order to ensure crash durability, call Flush(). // - // The maximum size of the key is 2^32 bytes. The maximum size of the value is 2^32 bytes. - // This database has been optimized under the assumption that values are generally much larger than keys. - // This affects performance, but not correctness. + // Optional secondary keys may be supplied; each secondary key acts as an additional alias for a + // sub-range of the value (or the whole value, when Offset=0 and Length=len(value)). Secondary + // keys are first-class keys: they appear in KeyCount(), Get(), Exists(), and are subject to the + // same TTL as the primary. They share the value's bytes on disk, so they cost one keymap entry + // each and do not duplicate value bytes. Secondary keys must be globally unique just like + // primary keys, and must not collide with the primary key or other secondaries. + // + // The maximum size of a key (primary or secondary) is 64 KiB (2^16 - 1 bytes). The maximum size + // of the value is 2^32 bytes. This database has been optimized under the assumption that values + // are generally much larger than keys. This affects performance, but not correctness. // // It is not safe to modify the byte slices passed to this function after the call - // (both the key and the value). - Put(key []byte, value []byte) error + // (the key bytes, the value bytes, and every secondary key's bytes). + Put(key []byte, value []byte, secondaryKeys ...*types.SecondaryKey) error // PutBatch stores multiple values in the database. Similar to Put, but allows for multiple values to be written - // at once. This may improve performance, but it otherwise has identical properties to a sequence of Put calls - // (i.e. this method does not atomically write the entire batch). + // at once, which may improve performance. // - // The maximum size of a key is 2^32 bytes. The maximum size of a value is 2^32 bytes. - // This database has been optimized under the assumption that values are generally much larger than keys. - // This affects performance, but not correctness. + // Each PutRequest may include zero or more secondary keys (see Put for semantics). + // + // The maximum size of a key (primary or secondary) is 64 KiB (2^16 - 1 bytes). The maximum size + // of a value is 2^32 bytes. This database has been optimized under the assumption that values + // are generally much larger than keys. This affects performance, but not correctness. // // It is not safe to modify the byte slices passed to this function after the call - // (including the key byte slices and the value byte slices). - PutBatch(batch []*types.KVPair) error + // (including the key byte slices, the value byte slices, and every secondary key's bytes). + PutBatch(batch []*types.PutRequest) error // Get retrieves a value from the database. The returned boolean indicates whether the key exists in the database // (returns false if the key does not exist). If an error is returned, the value of the other returned values are diff --git a/sei-db/db_engine/litt/test/db_test.go b/sei-db/db_engine/litt/test/db_test.go index 70be8a5089..f48652b9c0 100644 --- a/sei-db/db_engine/litt/test/db_test.go +++ b/sei-db/db_engine/litt/test/db_test.go @@ -154,11 +154,11 @@ func randomDBOperationsTest(t *testing.T, builder *dbBuilder) { require.NoError(t, err) expectedValues[tableName][string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[tableName][string(key)] = value } err = table.PutBatch(batch) @@ -284,11 +284,11 @@ func dbRestartTest(t *testing.T, builder *dbBuilder) { require.NoError(t, err) expectedValues[tableName][string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[tableName][string(key)] = value } err = table.PutBatch(batch) diff --git a/sei-db/db_engine/litt/test/keymap_migration_test.go b/sei-db/db_engine/litt/test/keymap_migration_test.go index 4829b3ea96..afd839ed7f 100644 --- a/sei-db/db_engine/litt/test/keymap_migration_test.go +++ b/sei-db/db_engine/litt/test/keymap_migration_test.go @@ -57,11 +57,11 @@ func TestKeymapMigration(t *testing.T) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -206,11 +206,11 @@ func TestFailedKeymapMigration(t *testing.T) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) diff --git a/sei-db/db_engine/litt/test/migration_data.go b/sei-db/db_engine/litt/test/migration_data.go index 321f3f9664..2d10745e0b 100644 --- a/sei-db/db_engine/litt/test/migration_data.go +++ b/sei-db/db_engine/litt/test/migration_data.go @@ -1,7 +1,71 @@ package test -// This map is used for migration tests. This data is written to a table at the old version, and used to verify that -// the data after migration is the same as the data before migration. +import "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt/types" + +// migrationPuts is the canonical input written to the migration-test fixture. It mirrors what real +// callers do: a sequence of Puts, some of which include secondary keys. Three primaries near the +// end carry secondaries that exercise every KeyKind path: +// +// - "kindStandalone-primary" is a 0-secondary Put (covered by every other entry too, but called +// out by name for readability). +// - "kindPrimary-with-one-secondary" carries exactly one secondary, exercising +// KeyKindPrimary + KeyKindFinalSecondary. +// - "kindPrimary-with-three-secondaries" carries three secondaries (a mix of strict sub-range +// and alias-the-whole-value), exercising KeyKindPrimary + 2× KeyKindSecondary + +// KeyKindFinalSecondary. +// +// Cross-version migration verifies that every primary AND every secondary survives the round +// trip through whatever the current on-disk format happens to be. +var migrationPuts = func() []*types.PutRequest { + out := make([]*types.PutRequest, 0, len(migrationData)+3) + for key, value := range migrationData { + out = append(out, &types.PutRequest{Key: []byte(key), Value: []byte(value)}) + } + + out = append(out, + &types.PutRequest{ + Key: []byte("kindStandalone-primary"), + Value: []byte("standalone"), + }, + &types.PutRequest{ + Key: []byte("kindPrimary-with-one-secondary"), + Value: []byte("hello world"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("kindFinal-only-secondary"), Offset: 0, Length: 5}, // "hello" + }, + }, + &types.PutRequest{ + Key: []byte("kindPrimary-with-three-secondaries"), + Value: []byte("the quick brown fox jumps over the lazy dog"), + SecondaryKeys: []*types.SecondaryKey{ + {Key: []byte("kindMid-quick"), Offset: 4, Length: 5}, // "quick" + {Key: []byte("kindMid-brown"), Offset: 10, Length: 5}, // "brown" + {Key: []byte("kindFinal-alias-whole"), Offset: 0, Length: 43 /* len of the value */}, // alias the whole value + }, + }, + ) + + return out +}() + +// expectedMigrationKVs flattens migrationPuts into a single map from key bytes -> expected bytes. +// Every primary key maps to its full value; every secondary key maps to the sub-range of its +// parent's value bytes that it points at. +var expectedMigrationKVs = func() map[string]string { + out := make(map[string]string, len(migrationPuts)) + for _, p := range migrationPuts { + out[string(p.Key)] = string(p.Value) + for _, sk := range p.SecondaryKeys { + out[string(sk.Key)] = string(p.Value[sk.Offset : sk.Offset+sk.Length]) + } + } + return out +}() + +// migrationData is the original key->value fixture from v3 and earlier. Newer fixtures extend it +// via migrationPuts above; keeping migrationData as a standalone map preserves the historical +// payload (so generated v3 data remains byte-for-byte identical were one to regenerate it under +// the old code). var migrationData = map[string]string{ "S7MOxfceWW": "oSNhtpEtRb48ntgPkhL", "uQxQ25apaahwztuOzNi": "Tn2MgaTP5B", diff --git a/sei-db/db_engine/litt/test/migration_test.go b/sei-db/db_engine/litt/test/migration_test.go index 3648155996..0199533589 100644 --- a/sei-db/db_engine/litt/test/migration_test.go +++ b/sei-db/db_engine/litt/test/migration_test.go @@ -49,13 +49,13 @@ func TestGenerateData(t *testing.T) { table, err := db.GetTable("test") require.NoError(t, err) - for key, value := range migrationData { - err = table.Put([]byte(key), []byte(value)) + for _, p := range migrationPuts { + err = table.Put(p.Key, p.Value, p.SecondaryKeys...) require.NoError(t, err) } // verify the data in the table - for key, value := range migrationData { + for key, value := range expectedMigrationKVs { v, exists, err := table.Get([]byte(key)) require.NoError(t, err) require.True(t, exists) @@ -129,11 +129,11 @@ func testMigration(t *testing.T, migrationPath string) { table, err := db.GetTable("test") require.NoError(t, err) - // Verify the data in the table matches our expected data - for key, value := range migrationData { + // Verify the data in the table matches our expected data (including secondary keys). + for key, value := range expectedMigrationKVs { v, exists, err := table.Get([]byte(key)) require.NoError(t, err) - require.True(t, exists) + require.True(t, exists, "key %q missing after migration", key) require.Equal(t, value, string(v)) } @@ -158,7 +158,7 @@ func testMigration(t *testing.T, migrationPath string) { } // Verify the original data. - for key, value := range migrationData { + for key, value := range expectedMigrationKVs { v, exists, err := table.Get([]byte(key)) require.NoError(t, err, "Error reading migration data") require.True(t, exists, "Migration data doesn't exist") @@ -177,7 +177,7 @@ func testMigration(t *testing.T, migrationPath string) { require.NoError(t, err, "Failed to get table after reopening") // Verify original migration data is still intact - for key, value := range migrationData { + for key, value := range expectedMigrationKVs { v, exists, err := table.Get([]byte(key)) require.NoError(t, err, "Error reading migration data after reopen") require.True(t, exists, "Migration data doesn't exist after reopen") diff --git a/sei-db/db_engine/litt/test/table_test.go b/sei-db/db_engine/litt/test/table_test.go index de4e8ed0c9..0fc208159e 100644 --- a/sei-db/db_engine/litt/test/table_test.go +++ b/sei-db/db_engine/litt/test/table_test.go @@ -290,11 +290,11 @@ func randomTableOperationsTest(t *testing.T, tableBuilder *tableBuilder) { require.NoError(t, err) expectedValues[string(key)] = value } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value } err = table.PutBatch(batch) @@ -407,11 +407,11 @@ func garbageCollectionTest(t *testing.T, tableBuilder *tableBuilder) { expectedValues[string(key)] = value creationTimes[string(key)] = newTime } else { - batch := make([]*types.KVPair, 0, batchSize) + batch := make([]*types.PutRequest, 0, batchSize) for j := int32(0); j < batchSize; j++ { key := rand.PrintableVariableBytes(32, 64) value := rand.PrintableVariableBytes(1, 128) - batch = append(batch, &types.KVPair{Key: key, Value: value}) + batch = append(batch, &types.PutRequest{Key: key, Value: value}) expectedValues[string(key)] = value creationTimes[string(key)] = newTime } @@ -549,3 +549,114 @@ func TestInvalidTableName(t *testing.T) { require.Error(t, err) require.Nil(t, table) } + +// secondaryKeyBasicsTest runs against every table implementation registered in tableBuilders. It +// verifies that secondary keys behave like first-class keys at the Table interface: Put accepts +// them, Get returns the correct sub-range bytes both before and after Flush, Exists reports them +// as present, and KeyCount counts them. +func secondaryKeyBasicsTest(t *testing.T, tb *tableBuilder) { + rand := util.NewTestRandom() + directory := t.TempDir() + tableName := rand.String(8) + table, err := tb.builder(time.Now, tableName, directory) + require.NoError(t, err) + + value := []byte("the quick brown fox") + primary := []byte("primary") + sk1 := &types.SecondaryKey{Key: []byte("quick"), Offset: 4, Length: 5} + sk2 := &types.SecondaryKey{Key: []byte("alias"), Offset: 0, Length: uint32(len(value))} + + require.NoError(t, table.Put(primary, value, sk1, sk2)) + + verify := func(stage string) { + t.Helper() + got, ok, err := table.Get(primary) + require.NoError(t, err, stage) + require.True(t, ok, stage) + require.Equal(t, value, got, stage) + + ok, err = table.Exists(sk1.Key) + require.NoError(t, err, stage) + require.True(t, ok, stage) + got, ok, err = table.Get(sk1.Key) + require.NoError(t, err, stage) + require.True(t, ok, stage) + require.Equal(t, value[sk1.Offset:sk1.Offset+sk1.Length], got, stage) + + got, ok, err = table.Get(sk2.Key) + require.NoError(t, err, stage) + require.True(t, ok, stage) + require.Equal(t, value, got, stage) + + require.EqualValues(t, 3, table.KeyCount(), stage) + } + + verify("before flush") + require.NoError(t, table.Flush()) + verify("after flush") + + require.NoError(t, table.Destroy()) +} + +func TestSecondaryKeyBasics(t *testing.T) { + t.Parallel() + for _, tb := range tableBuilders { + tb := tb + t.Run(tb.name, func(t *testing.T) { + t.Parallel() + secondaryKeyBasicsTest(t, tb) + }) + } +} + +// secondaryKeyCachedWriteHotTest verifies that immediately after Put, both the primary and every +// secondary key are hot in the cached table's write cache (CacheAwareGet with +// onlyReadFromCache=true returns the bytes without touching disk). Skips non-cached +// implementations since CacheAwareGet on those is functionally identical to Get. +func secondaryKeyCachedWriteHotTest(t *testing.T, tb *tableBuilder) { + rand := util.NewTestRandom() + directory := t.TempDir() + tableName := rand.String(8) + table, err := tb.builder(time.Now, tableName, directory) + require.NoError(t, err) + + value := []byte("hello world") + require.NoError(t, table.Put([]byte("primary"), value, + &types.SecondaryKey{Key: []byte("hello"), Offset: 0, Length: 5}, + &types.SecondaryKey{Key: []byte("world"), Offset: 6, Length: 5}, + )) + + for _, kv := range []struct { + key []byte + expected []byte + }{ + {[]byte("primary"), value}, + {[]byte("hello"), []byte("hello")}, + {[]byte("world"), []byte("world")}, + } { + got, ok, hot, err := table.CacheAwareGet(kv.key, true) + require.NoError(t, err, "key=%s", kv.key) + require.True(t, ok, "key=%s", kv.key) + require.True(t, hot, "key=%s expected to be in write cache", kv.key) + require.Equal(t, kv.expected, got, "key=%s", kv.key) + } + + require.NoError(t, table.Destroy()) +} + +func TestSecondaryKeyCachedWriteHot(t *testing.T) { + t.Parallel() + // Cached variants only: the non-cached builders treat CacheAwareGet(_, true) as "miss". + cachedBuilders := []*tableBuilder{ + {"cached memtable", buildCachedMemTable}, + {"cached mem keymap disk table", buildCachedMemKeyDiskTable}, + {"cached pebbledb keymap disk table", buildCachedPebbleDBKeyDiskTable}, + } + for _, tb := range cachedBuilders { + tb := tb + t.Run(tb.name, func(t *testing.T) { + t.Parallel() + secondaryKeyCachedWriteHotTest(t, tb) + }) + } +} diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/keymap/data/000002.log b/sei-db/db_engine/litt/test/testdata/v3/test/keymap/data/000002.log index fc9ba6bff3..f51d65d12d 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/keymap/data/000002.log and b/sei-db/db_engine/litt/test/testdata/v3/test/keymap/data/000002.log differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-0.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-0.values index 4016981995..6daca79886 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-0.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-0.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-1.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-1.values index 489e21f4e9..baaf170b0e 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-1.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-1.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-2.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-2.values index dfaf4da227..f7c862584d 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-2.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-2.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-3.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-3.values index 5f8bf202a1..9f2bb7c622 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-3.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0-3.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0.keys b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0.keys index 72b989d9d8..60226944b6 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0.keys and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0.keys differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0.metadata b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0.metadata index f6b48c068d..62e1a1ad66 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/0.metadata and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/0.metadata differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-0.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-0.values index d980957c5c..c33a5234f8 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-0.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-0.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-1.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-1.values index 6f4e3a4095..978ae1373f 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-1.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-1.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-2.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-2.values index d4056be1a4..3c9ffd974d 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-2.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-2.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-3.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-3.values index 1ccc5fd0d2..be04be4fbe 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-3.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1-3.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1.keys b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1.keys index 06157730f2..cfebf44316 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1.keys and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1.keys differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1.metadata b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1.metadata index 24abcd0799..0e95125194 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/1.metadata and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/1.metadata differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-0.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-0.values index 15e0c867f3..27702ca199 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-0.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-0.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-1.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-1.values index 2cbc3466ef..71268f14ff 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-1.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-1.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-2.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-2.values index a3d81abed6..94e5f92f74 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-2.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-2.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-3.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-3.values index aee8429683..a79d4b2864 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-3.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2-3.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2.keys b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2.keys index 4afd055d86..40c97fd82b 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2.keys and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2.keys differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2.metadata b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2.metadata index 5c22da65b6..f5bf816629 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/2.metadata and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/2.metadata differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-0.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-0.values index d81bf3f77b..393b4265dc 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-0.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-0.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-1.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-1.values index ec56a00dd5..af262a349b 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-1.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-1.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-2.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-2.values index 0be5505d12..52cee97766 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-2.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-2.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-3.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-3.values index d41f6cfb17..3764b786bf 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-3.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3-3.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3.keys b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3.keys index 8f85e07aa3..fdaba553fa 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3.keys and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3.keys differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3.metadata b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3.metadata index 069e8492ec..d7bc3a77de 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/3.metadata and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/3.metadata differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-0.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-0.values index d322dab4cb..e69de29bb2 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-0.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-0.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-1.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-1.values index d8978de1d6..e69de29bb2 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-1.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-1.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-2.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-2.values index 57bb4d2145..e69de29bb2 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-2.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-2.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-3.values b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-3.values index e87dd639fe..e69de29bb2 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-3.values and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4-3.values differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4.keys b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4.keys index 6b58409a6c..e69de29bb2 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4.keys and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4.keys differ diff --git a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4.metadata b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4.metadata index 125f4bb008..b0edd792d0 100644 Binary files a/sei-db/db_engine/litt/test/testdata/v3/test/segments/4.metadata and b/sei-db/db_engine/litt/test/testdata/v3/test/segments/4.metadata differ diff --git a/sei-db/db_engine/litt/types/key_kind.go b/sei-db/db_engine/litt/types/key_kind.go new file mode 100644 index 0000000000..2eba5957f9 --- /dev/null +++ b/sei-db/db_engine/litt/types/key_kind.go @@ -0,0 +1,25 @@ +package types + +// KeyKind tags each record in the per-segment key file. It distinguishes primary keys from secondary +// keys (which alias sub-ranges of another key's value bytes) and also delimits "groups" written by a +// single Put, used at recovery time to discard torn writes atomically. +// +// Layout on disk: each key-file record begins with a single KeyKind byte. Values 4-255 are reserved +// for future record kinds. +type KeyKind uint8 + +const ( + // KeyKindStandalone is a primary key whose Put did not include any secondary keys. The zero + // value is the default so any ScopedKey constructed without an explicit Kind is treated as an + // ordinary primary key. + KeyKindStandalone KeyKind = 0 + // KeyKindPrimary is a primary key whose Put included at least one secondary; the secondaries + // appear contiguously in the key file immediately after this record and terminate with a + // KeyKindFinalSecondary record. + KeyKindPrimary KeyKind = 1 + // KeyKindSecondary is a secondary key that is not the last secondary in its group. + KeyKindSecondary KeyKind = 2 + // KeyKindFinalSecondary is the last secondary in a group; it terminates the group and signals + // that the group is fully written. + KeyKindFinalSecondary KeyKind = 3 +) diff --git a/sei-db/db_engine/litt/types/kv_pair.go b/sei-db/db_engine/litt/types/kv_pair.go deleted file mode 100644 index 7fbc8bf74e..0000000000 --- a/sei-db/db_engine/litt/types/kv_pair.go +++ /dev/null @@ -1,9 +0,0 @@ -package types - -// KVPair represents a key-value pair. -type KVPair struct { - // Key is the key. - Key []byte - // Value is the value. - Value []byte -} diff --git a/sei-db/db_engine/litt/types/put_request.go b/sei-db/db_engine/litt/types/put_request.go new file mode 100644 index 0000000000..40d2166a8c --- /dev/null +++ b/sei-db/db_engine/litt/types/put_request.go @@ -0,0 +1,11 @@ +package types + +// A request to put a key-value pair with optional secondary keys into the database. +type PutRequest struct { + // Key is the primary key. + Key []byte + // Value is the value to put. Only written once, even if secondary keys are provided. + Value []byte + // Secondary keys pointing to sub-ranges of the value. May be nil. + SecondaryKeys []*SecondaryKey +} diff --git a/sei-db/db_engine/litt/types/scoped_key.go b/sei-db/db_engine/litt/types/scoped_key.go index b785248f40..7dbb6253f1 100644 --- a/sei-db/db_engine/litt/types/scoped_key.go +++ b/sei-db/db_engine/litt/types/scoped_key.go @@ -7,4 +7,9 @@ type ScopedKey struct { Key []byte // The location where the value associated with the key is stored. Address Address + // Kind tags the record's role in the key file: ordinary primary, primary with secondaries to + // follow, or one of the secondaries that follow such a primary. The zero value + // (KeyKindStandalone) means an ordinary primary key, so call sites that do not care about + // secondary keys can construct ScopedKey literals as before. + Kind KeyKind } diff --git a/sei-db/db_engine/litt/types/secondary_key.go b/sei-db/db_engine/litt/types/secondary_key.go new file mode 100644 index 0000000000..474cf99dcd --- /dev/null +++ b/sei-db/db_engine/litt/types/secondary_key.go @@ -0,0 +1,15 @@ +package types + +// A SecondaryKey is used to access specific parts of a value with direct lookups (i.e. without needing to read the +// entire value into memory). It can also be used to alias the entire value to a different key. +type SecondaryKey struct { + // A key in the DB. Similar to primary keys, secondary keys must be globally unique and cannot be modified after + // creation (other than being deleted when the TTL expires). + Key []byte + // The offset of the start of the byte range described by the secondary key. Must be less than or equal to the + // length of the full value associated with the key. + Offset uint32 + // The length of the byte range described by the secondary key. Offset+Length must be less than or equal to the + // length of the full value associated with the key. + Length uint32 +}