Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions sei-db/db_engine/litt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
- [Configuration Options](#configuration-options)
- [CLI](#littdb-cli)
- [Definitions](#definitions)
- [Architecture](docsrchitecture.md)
- [Filesystem Layout](docsilesystem_layout.md)
- [Architecture](docs/architecture.md)
- [Filesystem Layout](docs/filesystem_layout.md)

# What is LittDB?

Expand Down Expand Up @@ -107,38 +107,56 @@ Source: [db.go](db.go)

```go
type DB interface {
GetTable(name string) (Table, error)
DropTable(name string) error
Stop() error
Destroy() error
GetTable(name string) (Table, error)
DropTable(name string) error
Stop() error
Destroy() error
}
```

Source: [table.go](table.go)

```go
type Table interface {
Name() string
Put(key []byte, value []byte) error
PutBatch(batch []*types.KVPair) error
Get(key []byte) ([]byte, bool, error)
Exists(key []byte) (bool, error)
Flush() error
Size() uint64
SetTTL(ttl time.Duration) error
SetCacheSize(size uint64) error
Name() string
Put(key []byte, value []byte, secondaryKeys ...*types.SecondaryKey) error
PutBatch(batch []*types.PutRequest) error
Get(key []byte) ([]byte, bool, error)
Exists(key []byte) (bool, error)
Flush() error
Size() uint64
SetTTL(ttl time.Duration) error
SetCacheSize(size uint64) error
}
```

Source: [kv_pair.go](types/kv_pair.go)
Both primary keys and secondary keys must not exceed 64 KiB (2^16 - 1 bytes). Values may be up to 2^32 bytes.

Source: [put_request.go](types/put_request.go)

```go
type PutRequest struct {
Key []byte
Value []byte
SecondaryKeys []*SecondaryKey // optional, may be nil
}
```
type KVPair struct {
Key []byte
Value []byte

Source: [secondary_key.go](types/secondary_key.go)

```go
type SecondaryKey struct {
Key []byte // a globally unique key alias
Offset uint32 // byte offset into the parent value
Length uint32 // length of the byte range (Offset+Length <= len(Value))
}
```

A secondary key is a first-class key that aliases a sub-range (or the whole) of the parent value's
bytes. `Get`, `Exists`, `KeyCount`, and TTL all treat secondary keys identically to primary keys.
Secondary keys share the value's bytes on disk, so each one costs roughly one keymap entry rather
than duplicating value bytes.

## Getting Started

Below is a functional example showing how to use LittDB.
Expand All @@ -147,17 +165,17 @@ Below is a functional example showing how to use LittDB.
// Configure and build the database.
config, err := littbuilder.DefaultConfig("path/to/where/data/is/stored")
if err != nil {
return err
return err
}

db, err := config.Build(context.Background())
if err != nil {
return err
return err
}

myTable, err := db.GetTable("my-table") // this code works if the table is new or if the table already exists
if err != nil {
return err
return err
}

// Write a key-value pair to the table.
Expand All @@ -166,21 +184,21 @@ value := []byte("this is a value")

err = myTable.Put(key, value)
if err != nil {
return err
return err
}

// Flush the data to disk.
err = myTable.Flush()
if err != nil {
return err
return err
}

// Congratulations! Your data is now durable on disk.

// Read the value back. This works before or after a flush.
val, ok, err := myTable.Get(key)
if err != nil {
return err
return err
}
```

Expand Down
79 changes: 67 additions & 12 deletions sei-db/db_engine/litt/benchmark/data_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path"
"strings"
"sync"
"time"

"github.com/sei-protocol/sei-chain/sei-db/common/unit"
Expand Down Expand Up @@ -94,9 +95,19 @@ type DataTracker struct {
// The size of the values in bytes for new cohorts.
valueSize uint64

// This channel has capacity one and initially has one value in it. This value is drained when the DataTracker is
// fully stopped. Other threads can use this to block until the DataTracker is fully stopped.
closedChan chan struct{}
// Closed by Close() to signal the data generator goroutine to drain any pending
// reports from writtenKeyIndicesChan and then exit. This is what makes Close()
// deterministic with respect to ReportWrite: any ReportWrite call that has
// returned before Close() is invoked is guaranteed to be processed by the time
// Close() returns.
shutdownChan chan struct{}

// Closed by the data generator goroutine when it has fully exited. Used by
// Close() to wait for the goroutine to finish.
doneChan chan struct{}

// Ensures Close() is idempotent.
closeOnce sync.Once

// Used to handle fatal errors in the DataTracker.
errorMonitor *util.ErrorMonitor
Expand Down Expand Up @@ -165,9 +176,6 @@ func NewDataTracker(
safetyMargin := time.Duration(config.ReadSafetyMarginMinutes * float64(time.Minute))
safeTTL := ttl - safetyMargin

closedChan := make(chan struct{}, 1)
closedChan <- struct{}{} // Will be drained when the DataTracker is closed.

ctx, cancel := context.WithCancel(ctx)

tracker := &DataTracker{
Expand All @@ -190,7 +198,8 @@ func NewDataTracker(
safeTTL: safeTTL,
valueSize: valueSize,
generator: NewDataGenerator(config.Seed, config.RandomPoolSize),
closedChan: closedChan,
shutdownChan: make(chan struct{}),
doneChan: make(chan struct{}),
errorMonitor: errorMonitor,
}

Expand Down Expand Up @@ -298,12 +307,17 @@ func (t *DataTracker) GetWriteInfo() *WriteInfo {
}

// ReportWrite is called when a key has been written to the database. This means that the key is now safe to be read.
// Reports submitted before Close() is invoked are guaranteed to be processed by the time Close() returns.
// Once shutdown has been initiated (via Close() or context cancellation), subsequent ReportWrite calls return
// immediately without queuing the report.
func (t *DataTracker) ReportWrite(index uint64) {
select {
case t.writtenKeyIndicesChan <- index:
return
case <-t.ctx.Done():
return
case <-t.shutdownChan:
return
}
}

Expand Down Expand Up @@ -331,19 +345,32 @@ func (t *DataTracker) GetReadInfoWithTimeout(timeout time.Duration) *ReadInfo {
}
}

// Close stops the key manager's background tasks.
// Close stops the key manager's background tasks. Close is idempotent.
//
// Close performs a graceful shutdown: any ReportWrite call that returned before Close was invoked
// is guaranteed to be processed (and any cohort completion side effects persisted) by the time
// Close returns. After Close starts, additional ReportWrite calls return immediately without
// queuing the report.
func (t *DataTracker) Close() {
t.cancel()
t.closedChan <- struct{}{}
<-t.closedChan
t.closeOnce.Do(func() {
// Signal the data generator goroutine to drain pending reports and exit. Without this
// step, cancelling the context first could cause the goroutine to exit before
// processing reports that ReportWrite already enqueued, leaving on-disk cohort
// completion state inconsistent with what the caller has reported.
close(t.shutdownChan)
<-t.doneChan
// Cancel the context to unblock any callers still waiting in
// GetWriteInfo/GetReadInfo/GetReadInfoWithTimeout.
t.cancel()
})
}

// dataGenerator is responsible for generating data in the background.
func (t *DataTracker) dataGenerator() {
ticker := time.NewTicker(time.Duration(t.config.CohortGCPeriodSeconds * float64(time.Second)))
defer func() {
ticker.Stop()
<-t.closedChan
close(t.doneChan)
}()

nextWriteInfo := t.generateNextWriteInfo()
Expand All @@ -360,6 +387,10 @@ func (t *DataTracker) dataGenerator() {
return
case <-t.ctx.Done():
return
case <-t.shutdownChan:
// graceful shutdown initiated by Close()
t.drainPendingReports()
return
case keyIndex := <-t.writtenKeyIndicesChan:
// track keys that have been written so that we can read them in the future
t.handleWrittenKey(keyIndex)
Expand All @@ -381,6 +412,10 @@ func (t *DataTracker) dataGenerator() {
return
case <-t.ctx.Done():
return
case <-t.shutdownChan:
// graceful shutdown initiated by Close()
t.drainPendingReports()
return
case keyIndex := <-t.writtenKeyIndicesChan:
// track keys that have been written so that we can read them in the future
t.handleWrittenKey(keyIndex)
Expand All @@ -398,6 +433,26 @@ func (t *DataTracker) dataGenerator() {
}
}

// drainPendingReports processes any reports currently buffered in writtenKeyIndicesChan.
// Called by the data generator goroutine during graceful shutdown so that any ReportWrite
// call that returned before Close() is fully processed (i.e. cohort completion state
// persisted to disk reflects those reports).
//
// This is a best-effort drain of the channel buffer at the moment the goroutine sees the
// shutdown signal. ReportWrite stops accepting new submissions once shutdownChan is closed
// (and Close itself runs after all caller-side ReportWrite calls have returned), so by the
// time we get here the buffer is bounded and no new items can be enqueued.
func (t *DataTracker) drainPendingReports() {
for {
select {
case keyIndex := <-t.writtenKeyIndicesChan:
t.handleWrittenKey(keyIndex)
default:
return
}
}
}

// handleWrittenKey handles a key that has been written to the database.
func (t *DataTracker) handleWrittenKey(keyIndex uint64) {
// Add key index to the set of written keys we are tracking.
Expand Down
16 changes: 11 additions & 5 deletions sei-db/db_engine/litt/dbcache/cached_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,28 @@ func (c *cachedTable) Name() string {
return c.base.Name()
}

func (c *cachedTable) Put(key []byte, value []byte) error {
err := c.base.Put(key, value)
func (c *cachedTable) Put(key []byte, value []byte, secondaryKeys ...*types.SecondaryKey) error {
err := c.base.Put(key, value, secondaryKeys...)
if err != nil {
return fmt.Errorf("failed to put entry into base table: %w", err)
}
c.writeCache.Put(string(key), value)
for _, sk := range secondaryKeys {
c.writeCache.Put(string(sk.Key), value[sk.Offset:sk.Offset+sk.Length])
}
return nil
}

func (c *cachedTable) PutBatch(batch []*types.KVPair) error {
func (c *cachedTable) PutBatch(batch []*types.PutRequest) error {
err := c.base.PutBatch(batch)
if err != nil {
return err
}
for _, kv := range batch {
c.writeCache.Put(util.UnsafeBytesToString(kv.Key), kv.Value)
for _, req := range batch {
c.writeCache.Put(util.UnsafeBytesToString(req.Key), req.Value)
for _, sk := range req.SecondaryKeys {
c.writeCache.Put(util.UnsafeBytesToString(sk.Key), req.Value[sk.Offset:sk.Offset+sk.Length])
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion sei-db/db_engine/litt/disktable/control_loop_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type controlLoopWriteRequest struct {
controlLoopMessage

// values is a slice of key-value pairs to write.
values []*types.KVPair
values []*types.PutRequest
}

// controlLoopSetShardingFactorRequest is a request to set the sharding factor that is sent to the control loop.
Expand Down
Loading
Loading