Conversation
Reviewed commit fb202ed ("Self healing"). The self-healing feature is well-structured with a clean three-phase protocol (Request/Verify/Commit), solid test coverage across unit, E2E, and system tests, and good use of lease-claim event processing with exponential backoff. Previous review items on New items from this review:
Previous items (no longer applicable --
Mention @roomote in a comment to request specific changes to this pull request or fix all unresolved issues. |
There was a problem hiding this comment.
Pull request overview
This PR introduces “self-healing” behavior in the P2P DHT by adding a rebalance worker and new probe/status plumbing, while also refining chain-state eligibility gating (routing vs store/write) and bootstrap behavior to better recover from transient chain/API issues.
Changes:
- Add a rebalance background worker that scans local keys, heals under-replicated keys, and deletes redundant replicas when safe.
- Introduce side-effect-free local key status APIs (
RetrieveBatchLocalStatus,ListLocalKeysPage) plus a new network message (BatchProbeKeys) to probe key presence across peers. - Split chain-state eligibility into routing-eligible vs store-eligible allowlists (Active+Postponed vs Active-only), and adjust bootstrap + batch-store selection accordingly.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| supernode/adaptors/lumera.go | Forces top-supernode selection to be Active-only for write/finalization eligibility. |
| p2p/kademlia/store/sqlite/replication.go | Adds sqlite implementations for local key status + paged key listing. |
| p2p/kademlia/store/mem/mem.go | Adds mem-store implementations for local key status + paged key listing (currently inconsistent with hex-key expectations). |
| p2p/kademlia/store.go | Extends store interface with side-effect-free local key status APIs and defines LocalKeyStatus. |
| p2p/kademlia/rebalance_worker.go | New rebalance worker that probes holders, heals replicas, and deletes redundant local copies after confirmation. |
| p2p/kademlia/node_activity.go | Refines node activity handling to distinguish routing eligibility vs store/replication eligibility. |
| p2p/kademlia/network.go | Adds BatchProbeKeys handler + timeouts/dispatch wiring. |
| p2p/kademlia/message.go | Adds new gob-registered message types for BatchProbeKeys. |
| p2p/kademlia/dht_batch_store_test.go | Updates expected error text for new “eligible store peers” behavior. |
| p2p/kademlia/dht.go | Adds store allowlist, store-eligibility gating in write paths, starts rebalance worker, and adjusts batch-store candidate selection. |
| p2p/kademlia/bootstrap.go | Splits routing vs store allowlists from chain state (Active+Postponed vs Active-only) and makes bootstrap sync more resilient with periodic retries. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
e5f2d3a to
046c128
Compare
p2p/kademlia/rebalance_worker.go
Outdated
| // Preserve uploader/original copies even when this node is not in the | ||
| // current owner set and replica count is healthy. | ||
| if selfStatus.IsOriginal { | ||
| delete(deleteConfirm, keyHex) | ||
| continue | ||
| } | ||
|
|
||
| if rebalanceShouldTrackDeleteConfirm(isOwner, holders) { | ||
| deleteConfirm[keyHex]++ | ||
| if rebalanceShouldDelete(deleteConfirm[keyHex], deleted) { | ||
| if err := s.store.BatchDeleteRecords([]string{keyHex}); err != nil { | ||
| logtrace.Error(ctx, "rebalance: local delete failed", logtrace.Fields{ | ||
| logtrace.FieldModule: "p2p", | ||
| logtrace.FieldError: err.Error(), |
There was a problem hiding this comment.
The new IsOriginal guard and the re-indented rebalanceShouldTrackDeleteConfirm block are one tab deeper than the surrounding code (4 tabs vs the expected 3). Because Go compiles on brace matching, not whitespace, this won't break at runtime, but it makes the control flow misleading -- the } else { on line 197 visually looks like it belongs to a different block than it actually closes. The logtrace.Error call inside the if err != nil block also ended up at the same indentation as its opening brace. Running gofmt on this file should fix it in one pass.
| // Preserve uploader/original copies even when this node is not in the | |
| // current owner set and replica count is healthy. | |
| if selfStatus.IsOriginal { | |
| delete(deleteConfirm, keyHex) | |
| continue | |
| } | |
| if rebalanceShouldTrackDeleteConfirm(isOwner, holders) { | |
| deleteConfirm[keyHex]++ | |
| if rebalanceShouldDelete(deleteConfirm[keyHex], deleted) { | |
| if err := s.store.BatchDeleteRecords([]string{keyHex}); err != nil { | |
| logtrace.Error(ctx, "rebalance: local delete failed", logtrace.Fields{ | |
| logtrace.FieldModule: "p2p", | |
| logtrace.FieldError: err.Error(), | |
| // Preserve uploader/original copies even when this node is not in the | |
| // current owner set and replica count is healthy. | |
| if selfStatus.IsOriginal { | |
| delete(deleteConfirm, keyHex) | |
| continue | |
| } | |
| if rebalanceShouldTrackDeleteConfirm(isOwner, holders) { | |
| deleteConfirm[keyHex]++ | |
| if rebalanceShouldDelete(deleteConfirm[keyHex], deleted) { | |
| if err := s.store.BatchDeleteRecords([]string{keyHex}); err != nil { | |
| logtrace.Error(ctx, "rebalance: local delete failed", logtrace.Fields{ |
Fix it with Roo Code or mention @roomote and request a fix.
There was a problem hiding this comment.
Incremental review of changes since 3bf7e44. The self-healing feature is well-structured with solid test coverage. A few items worth addressing -- mostly around code duplication and a typo.
| defaultEventWorkers = 8 | ||
|
|
||
| requestTimeout = 20 * time.Second | ||
| verificationTimout = 20 * time.Second |
There was a problem hiding this comment.
nit / typo: verificationTimout is missing an e -- should be verificationTimeout.
| verificationTimout = 20 * time.Second | |
| verificationTimeout = 20 * time.Second |
| return client.CommitSelfHealing(cctx, req) | ||
| } | ||
|
|
||
| func parseHostAndPort(address string, defaultPort int) (host string, port int, ok bool) { |
There was a problem hiding this comment.
duplication: parseHostAndPort is duplicated here and in supernode/storage_challenge/service.go:498. The storage-challenge version is more robust (handles IPv6 bracketed literals, zone IDs, stray-bracket rejection). This copy silently drops those edge cases.
Consider extracting the storage-challenge version into a shared internal package (e.g. pkg/netutil) and importing it from both call sites.
| return out, true | ||
| } | ||
|
|
||
| func pickActionAnchorKey(keys []string) string { |
There was a problem hiding this comment.
duplication: pickActionAnchorKey is duplicated verbatim in supernode/transport/grpc/self_healing/handler.go:668. Worth extracting to a shared location (e.g. pkg/cascadekit or a new internal/selfhealing package) to keep the two in sync.
| ReconstructedHashHex: resp.ReconstructedHashHex, | ||
| ObserverId: ob, | ||
| ActionId: pl.ActionID, | ||
| }, verificationTimout) |
There was a problem hiding this comment.
nit: The verificationTimout typo from line 64 is referenced here. Once the constant is renamed, this call site will need updating too.
| pi.AvgPingResponseTime = pi.CumulativeResponseTime / float64(pi.TotalSuccessfulPings) | ||
| } | ||
| _ = s.store.UpsertPingHistory(pi) | ||
| _ = ctx |
There was a problem hiding this comment.
nit: _ = ctx is dead code -- ctx is already used as a function parameter and won't trigger an unused-variable warning. Same on line 395. Safe to remove both.
No description provided.