Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sei-db/ledger_db/parquet/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (s *Store) startPruning(pruneIntervalSeconds int64) {
latestVersion := s.latestVersion.Load()
pruneBeforeBlock := latestVersion - s.config.KeepRecent
if pruneBeforeBlock > 0 {
pruned := s.pruneOldFiles(uint64(pruneBeforeBlock))
pruned := s.PruneOldFiles(uint64(pruneBeforeBlock))
if pruned > 0 {
logger.Info("Pruned parquet file pairs older than block", "pruned-count", pruned, "block", pruneBeforeBlock)
}
Expand All @@ -394,7 +394,9 @@ func (s *Store) startPruning(pruneIntervalSeconds int64) {
}()
}

func (s *Store) pruneOldFiles(pruneBeforeBlock uint64) int {
// PruneOldFiles removes parquet file pairs whose data is entirely before
// pruneBeforeBlock. Returns the number of file pairs removed.
func (s *Store) PruneOldFiles(pruneBeforeBlock uint64) int {
// Get list of files to prune from the reader
filesToPrune := s.Reader.GetFilesBeforeBlock(pruneBeforeBlock)
if len(filesToPrune) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion sei-db/ledger_db/parquet/store_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestPruneOldFilesKeepsTrackingOnDeleteFailure(t *testing.T) {
t.Cleanup(func() { removeFile = origRemove })

// 0 + 500 <= 600, so file pair is prune-eligible.
pruned := store.pruneOldFiles(600)
pruned := store.PruneOldFiles(600)
require.Equal(t, 0, pruned)

// Receipt file should remain tracked because delete failed.
Expand Down
35 changes: 26 additions & 9 deletions sei-db/ledger_db/receipt/parquet_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package receipt

import (
"errors"
"math/big"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -187,7 +188,8 @@ func TestParquetFilePruning(t *testing.T) {
cfg := dbconfig.DefaultReceiptStoreConfig()
cfg.Backend = "parquet"
cfg.DBDirectory = t.TempDir()
cfg.KeepRecent = 600 // Keep 600 blocks, so files with blocks < (latest - 600) get pruned
cfg.KeepRecent = 600
cfg.PruneIntervalSeconds = 0 // Disable background pruning; we trigger it manually below.

store, err := NewReceiptStore(cfg, storeKey)
require.NoError(t, err)
Expand All @@ -197,7 +199,7 @@ func TestParquetFilePruning(t *testing.T) {
// File 1: blocks 501-1000
// File 2: blocks 1001-1500
for block := uint64(1); block <= 1500; block++ {
txHash := common.BigToHash(common.Big1.SetUint64(block))
txHash := common.BigToHash(new(big.Int).SetUint64(block))
receipt := makeTestReceipt(txHash, block, 0, common.HexToAddress("0x1"), nil)
err := store.SetReceipts(ctx.WithBlockHeight(int64(block)), []ReceiptRecord{
{TxHash: txHash, Receipt: receipt},
Expand All @@ -208,22 +210,37 @@ func TestParquetFilePruning(t *testing.T) {
// Close to flush all files
require.NoError(t, store.Close())

// Verify we have at least 2 receipt and log files
receiptFiles, err := filepath.Glob(filepath.Join(cfg.DBDirectory, "receipts_*.parquet"))
// Verify we have at least 2 receipt and log files before pruning
receiptFilesBefore, err := filepath.Glob(filepath.Join(cfg.DBDirectory, "receipts_*.parquet"))
require.NoError(t, err)
require.GreaterOrEqual(t, len(receiptFiles), 2, "should have at least 2 receipt files")
require.GreaterOrEqual(t, len(receiptFilesBefore), 2, "should have at least 2 receipt files")

logFiles, err := filepath.Glob(filepath.Join(cfg.DBDirectory, "logs_*.parquet"))
logFilesBefore, err := filepath.Glob(filepath.Join(cfg.DBDirectory, "logs_*.parquet"))
require.NoError(t, err)
require.GreaterOrEqual(t, len(logFiles), 2, "should have at least 2 log files")
require.GreaterOrEqual(t, len(logFilesBefore), 2, "should have at least 2 log files")

// Reopen store - pruning will run in background
// Reopen store (no background pruning because PruneIntervalSeconds == 0)
store, err = NewReceiptStore(cfg, storeKey)
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })

// Manually trigger pruning synchronously: prune files entirely before block 900
// (latestVersion 1500 - keepRecent 600 = 900)
pqStore := store.(*cachedReceiptStore).backend.(*parquetReceiptStore)
pruned := pqStore.store.PruneOldFiles(900)
require.Greater(t, pruned, 0, "should have pruned at least one file pair")

// Verify files were actually removed from disk
receiptFilesAfter, err := filepath.Glob(filepath.Join(cfg.DBDirectory, "receipts_*.parquet"))
require.NoError(t, err)
require.Less(t, len(receiptFilesAfter), len(receiptFilesBefore), "pruning should have removed receipt files")

logFilesAfter, err := filepath.Glob(filepath.Join(cfg.DBDirectory, "logs_*.parquet"))
require.NoError(t, err)
require.Less(t, len(logFilesAfter), len(logFilesBefore), "pruning should have removed log files")

// Verify we can still query recent data
txHash := common.BigToHash(common.Big1.SetUint64(1400))
txHash := common.BigToHash(new(big.Int).SetUint64(1400))
_, err = store.GetReceiptFromStore(ctx, txHash)
require.NoError(t, err)
}
Expand Down
Loading