Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,18 @@ func (w *WAL) Append(entry DeltaEntry) error {
return ErrWALFull
}

// Write [4-byte length][data]
// Write [4-byte length][data] as a single Write to avoid a
// torn-record window: a crash between the length write and the
// data write would leave a valid length prefix on disk with no
// corresponding data, which causes Replay to fail on recovery.
var lenBuf [4]byte
binary.LittleEndian.PutUint32(lenBuf[:], uint32(len(data)))
record := make([]byte, 0, 4+len(data))
record = append(record, lenBuf[:]...)
record = append(record, data...)

if _, err := w.f.Write(lenBuf[:]); err != nil {
return fmt.Errorf("write WAL length: %w", err)
}
if _, err := w.f.Write(data); err != nil {
return fmt.Errorf("write WAL data: %w", err)
if _, err := w.f.Write(record); err != nil {
return fmt.Errorf("write WAL record: %w", err)
}
if err := w.f.Sync(); err != nil {
return fmt.Errorf("sync WAL: %w", err)
Expand Down Expand Up @@ -179,6 +182,17 @@ func (w *WAL) Replay(fn func(DeltaEntry) error) (int, error) {

data := make([]byte, length)
if _, err := io.ReadFull(w.f, data); err != nil {
// If the data read comes up short (EOF or
// ErrUnexpectedEOF), this is a torn tail from a
// crash that occurred after writing the length
// prefix but before writing the full payload.
// Return the entries replayed so far rather than
// failing the entire recovery — the torn entry
// will be re-created by the next mutation that
// arrives and gets appended.
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return count, fmt.Errorf("read WAL data at entry %d: %w", count, err)
}

Expand Down
57 changes: 57 additions & 0 deletions wal/zz_wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package wal_test

import (
"encoding/binary"
"encoding/json"
"os"
"path/filepath"
Expand Down Expand Up @@ -288,5 +289,61 @@ func mustMarshal(t *testing.T, v interface{}) json.RawMessage {
return b
}

// TestWALReplayTornTail verifies that a WAL file with a partial final record
// (length prefix written but data missing — the crash-window bug) is handled
// gracefully: Replay returns the entries before the torn tail without error.
func TestWALReplayTornTail(t *testing.T) {
t.Parallel()
dir := t.TempDir()
path := filepath.Join(dir, "torn.wal")

// Write a complete entry followed by a torn partial entry directly to
// the file (bypass Append so we can create the torn state).
good := wal.DeltaEntry{Type: wal.DeltaRegister, NodeID: 42}
goodData, err := json.Marshal(good)
if err != nil {
t.Fatalf("marshal good entry: %v", err)
}

var buf []byte
// Complete entry: [4-byte len][data]
len4 := make([]byte, 4)
binary.LittleEndian.PutUint32(len4, uint32(len(goodData)))
buf = append(buf, len4...)
buf = append(buf, goodData...)
// Torn tail: [4-byte len for 512 bytes][no data following]
lenTorn := make([]byte, 4)
binary.LittleEndian.PutUint32(lenTorn, 512)
buf = append(buf, lenTorn...)

if err := os.WriteFile(path, buf, 0600); err != nil {
t.Fatalf("WriteFile: %v", err)
}

w, err := wal.NewWAL(path)
if err != nil {
t.Fatalf("NewWAL: %v", err)
}
defer w.Close()

var replayed []wal.DeltaEntry
count, err := w.Replay(func(e wal.DeltaEntry) error {
replayed = append(replayed, e)
return nil
})
if err != nil {
t.Fatalf("Replay should not error on torn tail: %v", err)
}
if count != 1 {
t.Errorf("Replay count = %d, want 1 (only the complete entry)", count)
}
if len(replayed) != 1 {
t.Fatalf("replayed %d entries, want 1", len(replayed))
}
if replayed[0].Type != wal.DeltaRegister || replayed[0].NodeID != 42 {
t.Errorf("replayed entry = %+v, want DeltaRegister node 42", replayed[0])
}
}

// Ensure the test binary doesn't import os only for the unused import linter.
var _ = os.DevNull
Loading