diff --git a/wal/wal.go b/wal/wal.go index e566054..7ea0f41 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -129,15 +129,18 @@ func (w *WAL) Append(entry DeltaEntry) error { return ErrWALFull } - // Write [4-byte length][data] + // Write [4-byte length][data] as a single Write to avoid a + // torn-record window: a crash between the length write and the + // data write would leave a valid length prefix on disk with no + // corresponding data, which causes Replay to fail on recovery. var lenBuf [4]byte binary.LittleEndian.PutUint32(lenBuf[:], uint32(len(data))) + record := make([]byte, 0, 4+len(data)) + record = append(record, lenBuf[:]...) + record = append(record, data...) - if _, err := w.f.Write(lenBuf[:]); err != nil { - return fmt.Errorf("write WAL length: %w", err) - } - if _, err := w.f.Write(data); err != nil { - return fmt.Errorf("write WAL data: %w", err) + if _, err := w.f.Write(record); err != nil { + return fmt.Errorf("write WAL record: %w", err) } if err := w.f.Sync(); err != nil { return fmt.Errorf("sync WAL: %w", err) @@ -179,6 +182,17 @@ func (w *WAL) Replay(fn func(DeltaEntry) error) (int, error) { data := make([]byte, length) if _, err := io.ReadFull(w.f, data); err != nil { + // If the data read comes up short (EOF or + // ErrUnexpectedEOF), this is a torn tail from a + // crash that occurred after writing the length + // prefix but before writing the full payload. + // Return the entries replayed so far rather than + // failing the entire recovery — the torn entry + // will be re-created by the next mutation that + // arrives and gets appended. + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } return count, fmt.Errorf("read WAL data at entry %d: %w", count, err) } diff --git a/wal/zz_wal_test.go b/wal/zz_wal_test.go index 8f1a5d4..5d5dfe0 100644 --- a/wal/zz_wal_test.go +++ b/wal/zz_wal_test.go @@ -3,6 +3,7 @@ package wal_test import ( + "encoding/binary" "encoding/json" "os" "path/filepath" @@ -288,5 +289,61 @@ func mustMarshal(t *testing.T, v interface{}) json.RawMessage { return b } +// TestWALReplayTornTail verifies that a WAL file with a partial final record +// (length prefix written but data missing — the crash-window bug) is handled +// gracefully: Replay returns the entries before the torn tail without error. +func TestWALReplayTornTail(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "torn.wal") + + // Write a complete entry followed by a torn partial entry directly to + // the file (bypass Append so we can create the torn state). + good := wal.DeltaEntry{Type: wal.DeltaRegister, NodeID: 42} + goodData, err := json.Marshal(good) + if err != nil { + t.Fatalf("marshal good entry: %v", err) + } + + var buf []byte + // Complete entry: [4-byte len][data] + len4 := make([]byte, 4) + binary.LittleEndian.PutUint32(len4, uint32(len(goodData))) + buf = append(buf, len4...) + buf = append(buf, goodData...) + // Torn tail: [4-byte len for 512 bytes][no data following] + lenTorn := make([]byte, 4) + binary.LittleEndian.PutUint32(lenTorn, 512) + buf = append(buf, lenTorn...) + + if err := os.WriteFile(path, buf, 0600); err != nil { + t.Fatalf("WriteFile: %v", err) + } + + w, err := wal.NewWAL(path) + if err != nil { + t.Fatalf("NewWAL: %v", err) + } + defer w.Close() + + var replayed []wal.DeltaEntry + count, err := w.Replay(func(e wal.DeltaEntry) error { + replayed = append(replayed, e) + return nil + }) + if err != nil { + t.Fatalf("Replay should not error on torn tail: %v", err) + } + if count != 1 { + t.Errorf("Replay count = %d, want 1 (only the complete entry)", count) + } + if len(replayed) != 1 { + t.Fatalf("replayed %d entries, want 1", len(replayed)) + } + if replayed[0].Type != wal.DeltaRegister || replayed[0].NodeID != 42 { + t.Errorf("replayed entry = %+v, want DeltaRegister node 42", replayed[0]) + } +} + // Ensure the test binary doesn't import os only for the unused import linter. var _ = os.DevNull