diff --git a/docs/postmortems/2026-03-09-qbft-compare-timer-race.md b/docs/postmortems/2026-03-09-qbft-compare-timer-race.md new file mode 100644 index 0000000000..27ee24ad6f --- /dev/null +++ b/docs/postmortems/2026-03-09-qbft-compare-timer-race.md @@ -0,0 +1,166 @@ +# Post-Mortem: QBFT Compare Timer Race Condition + +**Date**: 2026-03-09 +**Severity**: Medium — 2.5% attester duty failure rate across affected nodes in mainnet fleet +**Duration**: Ongoing from 2026-03-09 ~10:00 UTC, fix shipped in v1.9.2-rc1 +**Author**: Oisin Kyne + +## Summary + +A race condition in the QBFT consensus `compare()` function caused spurious round timeouts when the eager double-linear timer's absolute deadline expired before the Compare callback was scheduled by the Go runtime. The bug was latent in all versions with the Compare mechanism (v1.8.x, v1.9.0, v1.9.1) but was triggered fleet-wide by a DNS outage that shifted p2p message delivery timing past a critical threshold. + +## Impact + +- **Global mainnet attester timeout rate**: ~100/h (baseline) → ~2,700/h (25x increase) +- **Per-cluster failure rate**: ~2.5% of attester duties affected +- **Consensus decisions**: still occurred in most cases (round 2+), but with elevated latency (p99: 1.7s → 4.8s) +- **No slashing risk**: the bug caused missed consensus rounds, not conflicting signatures +- **Economic impact**: marginal — affected attestations were delayed, not permanently lost + +## Timeline + +| Time (UTC) | Event | +|---|---| +| 2026-03-09 ~09:40 | DNS outage begins — Obol domain names unreachable for ~20 minutes | +| 2026-03-09 ~09:40 | Prometheus remote write (Victoria Metrics) stops receiving data — visible as metrics gap | +| 2026-03-09 ~10:00 | DNS resolves — p2p connections re-establish with slightly different routing/latency | +| 2026-03-09 ~10:30 | Attester consensus timeouts begin climbing globally | +| 2026-03-09 ~11:00 | Global rate stabilises at ~2,700/h (from ~100/h baseline) | +| 2026-03-11 | Claude Code Investigation begins — initially attributed to slow beacon nodes | +| 2026-03-10 | BN latency confirmed at 5ms throughout — not a beacon node issue | +| 2026-03-10 | `compareAttestations` confirmed `false` on all affected clusters — Compare callback returns nil instantly | +| 2026-03-10 | Race condition identified: Go `select` non-deterministically picks between simultaneously-ready `compareErr` and `timerChan` channels | +| 2026-03-11 | Fix shipped in v1.9.2-rc1 (Compare mechanism removed entirely) | +| 2026-03-11 | plain-garden upgraded on test cluster — compare timeouts drop to 0 immediately | + +## Root Cause + +### The Bug + +In `core/qbft/qbft.go`, the `compare()` function (lines 438-466 in v1.9.1) waits on three channels: + +```go +select { +case err := <-compareErr: // Compare callback result +case inputValueSource = <-compareValue: // Local value for comparison +case <-timerChan: // Round timer + log.Warn(ctx, "", errors.New("timeout waiting for local data...")) + return inputValueSource, errTimeout +} +``` + +The `d.Compare` callback runs in a goroutine and sends `nil` on `compareErr` almost instantly when `compareAttestations` is `false` (the case for all production clusters — the feature was gated behind the `chain_split_halt` alpha flag that was rarely enabled). + +The `timerChan` comes from the `EagerDoubleLinear` round timer, which calculates **absolute deadlines** based on slot start time: + +- First call: `deadline = slotStart + dutyDelay + roundTimeout` (e.g., `slotStart + 4s + 1s = slotStart + 5s`) +- Second call (on justified pre-prepare): `deadline = firstDeadline + roundTimeout` (e.g., `slotStart + 5s + 1s = slotStart + 6s`) + +When `UponJustifiedPrePrepare` fires and calls `d.NewTimer(round)`, if the current time is past `slotStart + 6s`, the timer fires **immediately** with a zero or negative duration. Both `compareErr` and `timerChan` are ready simultaneously, and Go's `select` picks randomly — causing a ~50% spurious timeout rate for any pre-prepare arriving after the doubled deadline. + +### The Trigger + +The DNS outage on 2026-03-09 disrupted Obol infrastructure for ~20 minutes (relay endpoints, Prometheus remote write). The global attester timeout rate jumped from ~100/h to ~2,700/h immediately after. + +**Note**: Post-incident analysis of `p2p_ping_latency_secs` shows p2p latency was **unchanged** across the inflection point (p50 ~27ms, p99 ~400ms globally, flat throughout). Consensus duration p50 was also flat at ~130-150ms. The direct p2p TCP connections between peers were not affected by the DNS outage. + +The exact mechanism by which the DNS outage triggered the latent race condition is **not fully understood**. Possible hypotheses: +- Nodes that reconnected after the outage may have had a brief period of goroutine scheduling pressure (reconnection storms, buffered message replay) that caused some pre-prepares to arrive marginally later +- The Prometheus remote write failure may have caused backpressure affecting goroutine scheduling +- The correlation with the DNS outage may be coincidental, and an unrelated change (e.g., Lighthouse v8.1.2 beacon node release around the same time) may have contributed + +Regardless of the trigger, the race condition is deterministic: any pre-prepare arriving after `slotStart + 6s` has a ~50% chance of spurious timeout. + +### Why It Was Self-Sustaining + +The timeout rate stabilised at ~2,700/h and neither grew nor decayed over 36+ hours. This is because: + +- The race condition triggers whenever a pre-prepare arrives after the doubled timer deadline — this is a function of the normal distribution of message timing, not a feedback loop +- No positive feedback loop (timeouts don't cause more timeouts for the same duty) +- Whatever shifted the timing distribution past the threshold was persistent (not a transient backlog that drains) + +## Fix + +### Immediate (v1.9.2-rc1) + +As an immediate mitigation, the `compare()` function and the Compare mechanism were temporarily removed from the QBFT package in v1.9.2-rc1 (commit `e8a3ef5c`). This eliminates the race condition by removing the code path entirely: + +- Removed `C` type parameter from `Definition`, `Transport`, `Msg`, `Run` +- Removed `compare()` function, `errCompare`, `errTimeout`, `compareFailureRound` +- Removed `Compare` callback from `Definition` +- Removed `ValueSource()` from `Msg` interface +- Removed `VerifyCh` from consensus instance IO +- Removed `attestationChecker` and `supportedCompareDuties` +- `UponJustifiedPrePrepare` now directly broadcasts `MsgPrepare` without calling Compare + +**13 files changed, -633 lines.** All existing tests pass. + +**This removal is temporary.** The chain-split-halt / Compare mechanism is a desired safety feature and will be re-introduced with the race condition fixed (see permanent fix below). + +### Permanent Fix (for re-introduction of Compare) + +When Compare is re-added, the `compare()` function must prioritise `compareErr` over `timerChan` in the `select` to prevent the race: + +```go +case <-timerChan: + // Check if Compare already completed before declaring timeout. + // With eager absolute-deadline timers, timerChan may fire immediately + // (zero duration) when the deadline is already past. If d.Compare has + // also completed, Go's select picks randomly. Priority-check compareErr + // first to avoid spurious timeouts. + select { + case err := <-compareErr: + if err != nil { + return inputValueSource, errCompare + } + return inputValueSource, nil + default: + return inputValueSource, errTimeout + } +``` + +This was tested on a branch off `v1.9.1` (`fix/qbft-compare-timer-race`) and all tests pass. + +## Detection Gaps + +### What made this hard to find + +1. **Misleading BN latency correlation**: One peer (`plain-garden`) had elevated `attestation_data` p50 latency (~160ms vs 5ms on other peers). This initially looked like the root cause. Historical analysis showed this was a pre-existing condition on that operator's beacon node, unrelated to the inflection point — BN latency was 5ms throughout the period when timeouts spiked. + +2. **`compareAttestations` assumed enabled**: The log message "timeout waiting for local data, used for comparing with leader's proposed data" strongly implies the Compare feature is active. It took explicit verification via Prometheus feature flag metrics to confirm it was disabled on all affected clusters, revealing the race condition theory. + +3. **Existing tooling was proposal-focused**: The `missed-proposal` skill only analysed proposer duties. A new `qbft-debug` skill and script were built during this investigation to query Loki for compare timeouts, correlate with BN call performance, and analyse consensus outcomes. + +## Planned Permanent Remediations + +### Short-term + +- [x] Ship v1.9.2 with Compare mechanism temporarily removed (eliminates the race condition) +- [ ] Upgrade affected clusters — prioritise nodes on v1.9.1 (have the bug, easy upgrade path), v1.8.2 nodes are less affected + +### Medium-term + +- [ ] **Early consensus termination**: When a QBFT instance can't reach quorum (peers have decided and left), it currently spirals through rounds until the 25s hard deadline. Implement deadliner-based context cancellation so expired duties exit immediately rather than burning CPU and p2p bandwidth on futile round changes. + +- [ ] **Drop expired duty messages at p2p layer**: `ParSigDB.StoreExternal` processes all incoming partial signatures including for expired duties. Add a fast-path check at the p2p handler or parsigdb level to reject messages for duties past their deadline without acquiring the mutex or logging. + +- [ ] **Mitigate post-restart partial signature replay storm**: When a node restarts, peers re-send buffered partial signatures for recent duties. The restarted node's empty `ParSigDB` accepts the first copy and then processes thousands of duplicates synchronously, each acquiring `db.mu` and logging at debug level. This burst (~9,000 messages/min observed, lasting ~2 minutes) blocks goroutine scheduling and causes consensus timeouts during the restart window. Fixes should include: + - Deduplicating or rate-limiting incoming parsig messages during the replay window + - Dropping messages for duties that have already expired per the deadliner + - Making `ParSigEx.handle()` async (see below) + +- [ ] **Async parsig processing**: `ParSigEx.handle()` calls subscribers synchronously (noted as `TODO(corver): Call this async` in the code). Making this async with a bounded channel would prevent p2p message floods from blocking handlers. + +### Long-term + +- [ ] **Consensus health monitoring**: Add metrics for round progression (how many rounds before decision), quorum reachability (can this node reach quorum given known peer states), and Compare callback latency. These would have surfaced the issue faster. + +- [ ] **Deterministic timer safety**: Any future use of absolute-deadline timers in combination with `select` statements should use priority receives (nested select with default) to prevent the Go `select` non-determinism from causing spurious timeouts. + +## Lessons Learned + +1. **Go `select` non-determinism is a real hazard with absolute timers.** When a timer channel can fire immediately (zero/negative duration), it races with other ready channels. This is a class of bug that's invisible in testing (timers are always relative in tests) and only manifests in production with real clock alignment. + +2. **Disabled code paths still execute.** The Compare mechanism was effectively a no-op (`compareAttestations=false` everywhere), but its `select` loop still ran on every `UponJustifiedPrePrepare` event. Features gated by runtime flags still need their code paths to be correct. + +3. **Infrastructure outages can trigger latent bugs.** The DNS outage didn't cause the bug — it correlated with the onset of symptoms. The exact trigger mechanism is not fully understood, but the race condition existed since the Compare mechanism was introduced and could be triggered by any change affecting pre-prepare delivery timing. diff --git a/scripts/debug/qbft_compare_timeout.py b/scripts/debug/qbft_compare_timeout.py new file mode 100644 index 0000000000..985a8da21b --- /dev/null +++ b/scripts/debug/qbft_compare_timeout.py @@ -0,0 +1,743 @@ +#!/usr/bin/env python3 +""" +Diagnose QBFT compare timeout issues ("timeout waiting for local data"). + +This timeout occurs when a node receives the leader's pre-prepare but hasn't +finished fetching its own unsigned data from the beacon node. The round timer +expires while the Compare function waits for inputValueSourceCh (VerifyCh). + +Requires OBOL_GRAFANA_API_TOKEN environment variable. + +Usage: + python qbft_compare_timeout.py [options] + + Options: + --network NETWORK Network name (default: mainnet) + --hours HOURS Hours to look back (default: 24) + --slot SLOT Analyze a specific slot instead of a time range + --limit LIMIT Max log entries to fetch (default: 5000) + +Outputs JSON with: + - timeout_events: parsed timeout occurrences with peer, slot, duty, round info + - correlated_events: BN call timings, consensus outcomes for affected slots + - metrics: aggregate Prometheus metrics (consensus timeouts, BN latency, decided rounds) + - summary: high-level analysis of patterns +""" + +import json +import os +import re +import sys +import urllib.error +import urllib.parse +import urllib.request +from datetime import datetime, timezone + +GRAFANA_BASE = "https://grafana.monitoring.gcp.obol.tech" + +GENESIS_TIME = { + "mainnet": 1606824023, + "hoodi": 1742212800, + "sepolia": 1655733600, +} + +SLOTS_PER_EPOCH = 32 +SECONDS_PER_SLOT = 12 + + +def get_auth_header() -> dict: + token = os.environ.get("OBOL_GRAFANA_API_TOKEN") + if not token: + return {} + return {"Authorization": f"Bearer {token}"} + + +def fetch_json(url: str, headers: dict, silent: bool = False) -> dict | None: + req = urllib.request.Request(url, headers=headers) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + if not silent: + print(f"Error: HTTP {e.code} fetching {url}", file=sys.stderr) + return None + except urllib.error.URLError as e: + if not silent: + print(f"Error: {e.reason}", file=sys.stderr) + return None + except Exception as e: + if not silent: + print(f"Error: {e}", file=sys.stderr) + return None + + +def discover_datasources(headers: dict) -> tuple[str | None, str | None]: + url = f"{GRAFANA_BASE}/api/datasources" + datasources = fetch_json(url, headers) + if not datasources: + return None, None + + prom_id = None + loki_id = None + for ds in datasources: + if ds.get("type") == "prometheus" and ds.get("name") == "prometheus": + prom_id = ds.get("id") + if ds.get("type") == "loki" and ds.get("name") == "Loki": + loki_id = ds.get("id") + + prom_url = f"{GRAFANA_BASE}/api/datasources/proxy/{prom_id}/api/v1/" if prom_id else None + loki_url = f"{GRAFANA_BASE}/api/datasources/proxy/{loki_id}/loki/api/v1/" if loki_id else None + return prom_url, loki_url + + +def prom_query(prom_url: str, headers: dict, query: str) -> dict | None: + url = f"{prom_url}query?query={urllib.parse.quote(query)}" + return fetch_json(url, headers, silent=True) + + +def prom_query_range(prom_url: str, headers: dict, query: str, start: int, end: int, step: str = "60s") -> dict | None: + params = urllib.parse.urlencode({ + "query": query, + "start": str(start), + "end": str(end), + "step": step, + }) + url = f"{prom_url}query_range?{params}" + return fetch_json(url, headers, silent=True) + + +def loki_query(loki_url: str, headers: dict, logql: str, start_ns: int, end_ns: int, limit: int = 5000) -> dict | None: + params = urllib.parse.urlencode({ + "query": logql, + "start": str(start_ns), + "end": str(end_ns), + "limit": str(limit), + "direction": "forward", + }) + url = f"{loki_url}query_range?{params}" + return fetch_json(url, headers, silent=True) + + +def extract_logfmt(line: str, field: str) -> str: + m = re.search(rf'{field}="([^"]*)"', line) + if m: + return m.group(1) + m = re.search(rf"{field}=(\S+)", line) + if m: + return m.group(1) + return "" + + +def parse_embedded_ts(line: str) -> int | None: + """Parse the embedded ts= field from a logfmt line, returning nanoseconds since epoch. + + Charon logs include ts=2026-03-09T20:53:59.123456789Z which is the actual application + timestamp. This is more accurate than the Loki receipt timestamp which can have + significant skew (0.5-2s observed). + """ + m = re.search(r'ts=(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})\.(\d+)Z', line) + if m: + dt = datetime.fromisoformat(m.group(1) + "+00:00") + base_ns = int(dt.timestamp()) * 1_000_000_000 + frac = m.group(2) + # Pad or truncate to 9 digits (nanoseconds) + frac_ns = int(frac.ljust(9, "0")[:9]) + return base_ns + frac_ns + # Fallback: try without fractional seconds + m = re.search(r'ts=(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z', line) + if m: + dt = datetime.fromisoformat(m.group(1) + "+00:00") + return int(dt.timestamp()) * 1_000_000_000 + return None + + +def get_event_timestamp_ns(loki_ts_str: str, line: str) -> int: + """Get the best available timestamp in nanoseconds: embedded ts= preferred over Loki timestamp.""" + embedded = parse_embedded_ts(line) + if embedded is not None: + return embedded + return int(loki_ts_str) + + +def parse_duty_string(duty_str: str) -> dict: + """Parse duty string like '1234/attester' into components.""" + parts = duty_str.split("/") + if len(parts) == 2: + try: + return {"slot": int(parts[0]), "type": parts[1]} + except ValueError: + pass + return {"slot": 0, "type": duty_str} + + +def slot_to_time(slot: int, network: str) -> str | None: + genesis = GENESIS_TIME.get(network) + if not genesis: + return None + ts = genesis + slot * SECONDS_PER_SLOT + return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def query_timeout_events(loki_url: str, headers: dict, cluster_name: str, network: str, + start_ns: int, end_ns: int, limit: int) -> list[dict]: + """Query Loki for compare timeout warnings.""" + logql = ( + f'{{cluster_name="{cluster_name}",cluster_network="{network}"}} ' + f'|= `timeout waiting for local data`' + ) + raw = loki_query(loki_url, headers, logql, start_ns, end_ns, limit) + if not raw: + return [] + + events = [] + for stream in raw.get("data", {}).get("result", []): + peer = stream.get("stream", {}).get("cluster_peer", "unknown") + for ts_str, line in stream.get("values", []): + duty_str = extract_logfmt(line, "duty") + duty = parse_duty_string(duty_str) + events.append({ + "timestamp_ns": get_event_timestamp_ns(ts_str, line), + "peer": peer, + "duty": duty_str, + "slot": duty["slot"], + "duty_type": duty["type"], + }) + + events.sort(key=lambda x: x["timestamp_ns"]) + return events + + +def query_round_changes(loki_url: str, headers: dict, cluster_name: str, network: str, + start_ns: int, end_ns: int, limit: int) -> list[dict]: + """Query Loki for QBFT round change events.""" + logql = ( + f'{{cluster_name="{cluster_name}",cluster_network="{network}"}} ' + f'|= `QBFT round changed`' + ) + raw = loki_query(loki_url, headers, logql, start_ns, end_ns, limit) + if not raw: + return [] + + events = [] + for stream in raw.get("data", {}).get("result", []): + peer = stream.get("stream", {}).get("cluster_peer", "unknown") + for ts_str, line in stream.get("values", []): + duty_str = extract_logfmt(line, "duty") + duty = parse_duty_string(duty_str) + events.append({ + "timestamp_ns": get_event_timestamp_ns(ts_str, line), + "peer": peer, + "duty": duty_str, + "slot": duty["slot"], + "duty_type": duty["type"], + "round": extract_logfmt(line, "round"), + "new_round": extract_logfmt(line, "new_round"), + "rule": extract_logfmt(line, "rule"), + "timeout_reason": extract_logfmt(line, "timeout_reason"), + }) + + events.sort(key=lambda x: x["timestamp_ns"]) + return events + + +def query_consensus_decided(loki_url: str, headers: dict, cluster_name: str, network: str, + start_ns: int, end_ns: int, limit: int) -> list[dict]: + """Query Loki for QBFT consensus decided events.""" + logql = ( + f'{{cluster_name="{cluster_name}",cluster_network="{network}"}} ' + f'|= `QBFT consensus decided`' + ) + raw = loki_query(loki_url, headers, logql, start_ns, end_ns, limit) + if not raw: + return [] + + events = [] + for stream in raw.get("data", {}).get("result", []): + peer = stream.get("stream", {}).get("cluster_peer", "unknown") + for ts_str, line in stream.get("values", []): + duty_str = extract_logfmt(line, "duty") + duty = parse_duty_string(duty_str) + events.append({ + "timestamp_ns": get_event_timestamp_ns(ts_str, line), + "peer": peer, + "duty": duty_str, + "slot": duty["slot"], + "duty_type": duty["type"], + "round": extract_logfmt(line, "round"), + "leader_name": extract_logfmt(line, "leader_name"), + "leader_index": extract_logfmt(line, "leader_index"), + }) + + events.sort(key=lambda x: x["timestamp_ns"]) + return events + + +def query_consensus_timeouts(loki_url: str, headers: dict, cluster_name: str, network: str, + start_ns: int, end_ns: int, limit: int) -> list[dict]: + """Query Loki for full consensus timeout errors.""" + logql = ( + f'{{cluster_name="{cluster_name}",cluster_network="{network}"}} ' + f'|= `consensus timeout`' + ) + raw = loki_query(loki_url, headers, logql, start_ns, end_ns, limit) + if not raw: + return [] + + events = [] + for stream in raw.get("data", {}).get("result", []): + peer = stream.get("stream", {}).get("cluster_peer", "unknown") + for ts_str, line in stream.get("values", []): + duty_str = extract_logfmt(line, "duty") + duty = parse_duty_string(duty_str) + events.append({ + "timestamp_ns": get_event_timestamp_ns(ts_str, line), + "peer": peer, + "duty": duty_str, + "slot": duty["slot"], + "duty_type": duty["type"], + }) + + events.sort(key=lambda x: x["timestamp_ns"]) + return events + + +def query_bn_call_logs(loki_url: str, headers: dict, cluster_name: str, network: str, + start_ns: int, end_ns: int, limit: int) -> list[dict]: + """Query Loki for beacon node call timing logs.""" + logql = ( + f'{{cluster_name="{cluster_name}",cluster_network="{network}"}} ' + f'|~ `Beacon node call finished|Beacon node call took longer|Calling beacon node endpoint`' + ) + raw = loki_query(loki_url, headers, logql, start_ns, end_ns, limit) + if not raw: + return [] + + events = [] + for stream in raw.get("data", {}).get("result", []): + peer = stream.get("stream", {}).get("cluster_peer", "unknown") + for ts_str, line in stream.get("values", []): + msg = extract_logfmt(line, "msg") + duty_str = extract_logfmt(line, "duty") + duty = parse_duty_string(duty_str) + + event_type = "" + if "Calling beacon node endpoint" in msg: + event_type = "bn_call_start" + elif "took longer" in msg: + event_type = "bn_call_slow" + elif "call finished" in msg: + event_type = "bn_call_done" + + events.append({ + "timestamp_ns": get_event_timestamp_ns(ts_str, line), + "peer": peer, + "duty": duty_str, + "slot": duty["slot"], + "duty_type": duty["type"], + "event_type": event_type, + "endpoint": extract_logfmt(line, "endpoint"), + "rtt": extract_logfmt(line, "rtt"), + }) + + events.sort(key=lambda x: x["timestamp_ns"]) + return events + + +def query_slot_logs(loki_url: str, headers: dict, cluster_name: str, network: str, + slot: int, limit: int) -> list[dict]: + """Query all consensus-related logs for a specific slot.""" + genesis = GENESIS_TIME.get(network) + if not genesis: + return [] + + slot_ts = genesis + slot * SECONDS_PER_SLOT + start_ns = (slot_ts - 15) * 1_000_000_000 + end_ns = (slot_ts + 120) * 1_000_000_000 + + logql = ( + f'{{cluster_name="{cluster_name}",cluster_network="{network}"}} ' + f'|~ `{slot}/attester|{slot}/proposer|{slot}/sync_contribution|slot={slot}|block_slot={slot}` ' + f'|~ `timeout waiting for local|QBFT|consensus|Beacon node call|Calling beacon`' + ) + raw = loki_query(loki_url, headers, logql, start_ns, end_ns, limit) + if not raw: + return [] + + events = [] + for stream in raw.get("data", {}).get("result", []): + peer = stream.get("stream", {}).get("cluster_peer", "unknown") + for ts_str, line in stream.get("values", []): + msg = extract_logfmt(line, "msg") + level = extract_logfmt(line, "level") + duty_str = extract_logfmt(line, "duty") + event_ts_ns = get_event_timestamp_ns(ts_str, line) + offset_s = (event_ts_ns / 1_000_000_000) - slot_ts + + events.append({ + "offset_s": round(offset_s, 3), + "timestamp_ns": event_ts_ns, + "peer": peer, + "level": level, + "msg": msg, + "duty": duty_str, + "round": extract_logfmt(line, "round"), + "new_round": extract_logfmt(line, "new_round"), + "rule": extract_logfmt(line, "rule"), + "timeout_reason": extract_logfmt(line, "timeout_reason"), + "endpoint": extract_logfmt(line, "endpoint"), + "rtt": extract_logfmt(line, "rtt"), + "leader_name": extract_logfmt(line, "leader_name"), + "leader_index": extract_logfmt(line, "leader_index"), + }) + + events.sort(key=lambda x: x["timestamp_ns"]) + return events + + +def query_prometheus_metrics(prom_url: str, headers: dict, cluster_name: str, network: str, + start: int, end: int) -> dict: + """Query Prometheus for aggregate consensus and BN metrics.""" + metrics = {} + + # Consensus timeout rate + q = f'sum(rate(core_consensus_timeout_total{{cluster_name="{cluster_name}",cluster_network="{network}"}}[1h])) by (duty, timer)' + result = prom_query(prom_url, headers, q) + if result and result.get("data", {}).get("result"): + metrics["consensus_timeout_rate_per_hour"] = [ + { + "duty": r.get("metric", {}).get("duty", "?"), + "timer": r.get("metric", {}).get("timer", "?"), + "rate": float(r.get("value", [0, 0])[1]), + } + for r in result["data"]["result"] + ] + + # Consensus decided rounds + q = f'core_consensus_decided_rounds{{cluster_name="{cluster_name}",cluster_network="{network}"}}' + result = prom_query(prom_url, headers, q) + if result and result.get("data", {}).get("result"): + metrics["last_decided_rounds"] = [ + { + "duty": r.get("metric", {}).get("duty", "?"), + "timer": r.get("metric", {}).get("timer", "?"), + "round": float(r.get("value", [0, 0])[1]), + } + for r in result["data"]["result"] + ] + + # Consensus duration + q = f'histogram_quantile(0.99, sum(rate(core_consensus_duration_seconds_bucket{{cluster_name="{cluster_name}",cluster_network="{network}"}}[1h])) by (duty, timer, le))' + result = prom_query(prom_url, headers, q) + if result and result.get("data", {}).get("result"): + metrics["consensus_duration_p99"] = [ + { + "duty": r.get("metric", {}).get("duty", "?"), + "timer": r.get("metric", {}).get("timer", "?"), + "seconds": float(r.get("value", [0, 0])[1]), + } + for r in result["data"]["result"] + ] + + # BN call latency p99 + q = f'histogram_quantile(0.99, sum(rate(app_eth2_latency_seconds_bucket{{cluster_name="{cluster_name}",cluster_network="{network}"}}[1h])) by (endpoint, le))' + result = prom_query(prom_url, headers, q) + if result and result.get("data", {}).get("result"): + metrics["bn_latency_p99"] = [ + { + "endpoint": r.get("metric", {}).get("endpoint", "?"), + "seconds": float(r.get("value", [0, 0])[1]), + } + for r in result["data"]["result"] + if not (r.get("value", [0, "NaN"])[1] == "NaN") + ] + + # BN error rate + q = f'sum(rate(app_eth2_errors_total{{cluster_name="{cluster_name}",cluster_network="{network}"}}[1h])) by (endpoint)' + result = prom_query(prom_url, headers, q) + if result and result.get("data", {}).get("result"): + metrics["bn_error_rate_per_hour"] = [ + { + "endpoint": r.get("metric", {}).get("endpoint", "?"), + "rate": float(r.get("value", [0, 0])[1]), + } + for r in result["data"]["result"] + if float(r.get("value", [0, 0])[1]) > 0 + ] + + # Total consensus timeouts + q = f'sum(core_consensus_timeout_total{{cluster_name="{cluster_name}",cluster_network="{network}"}}) by (duty, timer)' + result = prom_query(prom_url, headers, q) + if result and result.get("data", {}).get("result"): + metrics["consensus_timeout_totals"] = [ + { + "duty": r.get("metric", {}).get("duty", "?"), + "timer": r.get("metric", {}).get("timer", "?"), + "total": float(r.get("value", [0, 0])[1]), + } + for r in result["data"]["result"] + ] + + return metrics + + +def correlate_events(timeout_events: list, round_changes: list, decided_events: list, + bn_calls: list, consensus_timeouts: list) -> dict: + """Correlate timeout events with round changes, decisions, and BN calls.""" + # Group by slot for correlation + affected_slots = set() + for ev in timeout_events: + affected_slots.add((ev["slot"], ev["duty_type"])) + + correlations = {} + for slot, duty_type in sorted(affected_slots): + key = f"{slot}/{duty_type}" + corr = { + "slot": slot, + "duty_type": duty_type, + "timeout_peers": [], + "round_changes": [], + "decided": None, + "full_timeout": False, + "bn_calls": [], + } + + # Timeout peers + for ev in timeout_events: + if ev["slot"] == slot and ev["duty_type"] == duty_type: + corr["timeout_peers"].append(ev["peer"]) + + # Round changes for this slot + for ev in round_changes: + if ev["slot"] == slot and ev["duty_type"] == duty_type: + corr["round_changes"].append({ + "peer": ev["peer"], + "round": ev["round"], + "new_round": ev["new_round"], + "rule": ev["rule"], + "timeout_reason": ev["timeout_reason"], + }) + + # Did consensus decide? + for ev in decided_events: + if ev["slot"] == slot and ev["duty_type"] == duty_type: + corr["decided"] = { + "round": ev["round"], + "leader_name": ev["leader_name"], + "leader_index": ev["leader_index"], + } + break + + # Full consensus timeout? + for ev in consensus_timeouts: + if ev["slot"] == slot and ev["duty_type"] == duty_type: + corr["full_timeout"] = True + break + + # BN calls for this slot + for ev in bn_calls: + if ev["slot"] == slot and ev["duty_type"] == duty_type: + corr["bn_calls"].append({ + "peer": ev["peer"], + "event_type": ev["event_type"], + "endpoint": ev["endpoint"], + "rtt": ev["rtt"], + }) + + corr["timeout_peers"] = sorted(set(corr["timeout_peers"])) + correlations[key] = corr + + return correlations + + +def build_summary(timeout_events: list, correlations: dict, metrics: dict, network: str) -> dict: + """Build a high-level summary of the analysis.""" + summary = { + "total_timeout_events": len(timeout_events), + "affected_slots": len(correlations), + "affected_duty_types": {}, + "affected_peers": {}, + "slots_that_decided": 0, + "slots_that_fully_timed_out": 0, + "decided_round_distribution": {}, + "pattern_analysis": [], + } + + for ev in timeout_events: + dt = ev["duty_type"] + summary["affected_duty_types"][dt] = summary["affected_duty_types"].get(dt, 0) + 1 + p = ev["peer"] + summary["affected_peers"][p] = summary["affected_peers"].get(p, 0) + 1 + + for key, corr in correlations.items(): + if corr["decided"]: + summary["slots_that_decided"] += 1 + r = corr["decided"]["round"] + summary["decided_round_distribution"][r] = summary["decided_round_distribution"].get(r, 0) + 1 + if corr["full_timeout"]: + summary["slots_that_fully_timed_out"] += 1 + + # Pattern analysis + if summary["total_timeout_events"] > 0: + # Check if one peer is disproportionately affected + max_peer = max(summary["affected_peers"].items(), key=lambda x: x[1]) if summary["affected_peers"] else None + if max_peer and max_peer[1] > summary["total_timeout_events"] * 0.5: + summary["pattern_analysis"].append( + f"Peer '{max_peer[0]}' accounts for {max_peer[1]}/{summary['total_timeout_events']} " + f"({100*max_peer[1]//summary['total_timeout_events']}%) of timeouts - likely has a slow BN" + ) + + # Check if mostly attester (expected) + if "attester" in summary["affected_duty_types"]: + att_pct = summary["affected_duty_types"]["attester"] / summary["total_timeout_events"] + if att_pct > 0.9: + summary["pattern_analysis"].append( + "Timeouts are predominantly on attester duties (expected - only duty type with compare enabled)" + ) + + # Check recovery rate + if summary["affected_slots"] > 0: + recovery_rate = summary["slots_that_decided"] / summary["affected_slots"] + if recovery_rate > 0.9: + summary["pattern_analysis"].append( + f"High recovery rate: {summary['slots_that_decided']}/{summary['affected_slots']} " + f"({100*recovery_rate:.0f}%) slots eventually reached consensus despite compare timeout" + ) + elif recovery_rate < 0.5: + summary["pattern_analysis"].append( + f"Low recovery rate: only {summary['slots_that_decided']}/{summary['affected_slots']} " + f"({100*recovery_rate:.0f}%) slots reached consensus - indicates systemic issue" + ) + + # Check BN latency from metrics + if metrics.get("bn_latency_p99"): + slow_endpoints = [m for m in metrics["bn_latency_p99"] if m["seconds"] > 0.5] + if slow_endpoints: + summary["pattern_analysis"].append( + f"Slow BN endpoints detected (p99 > 500ms): " + + ", ".join(f"{e['endpoint']}={e['seconds']:.2f}s" for e in slow_endpoints[:5]) + ) + + return summary + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Diagnose QBFT compare timeout issues") + parser.add_argument("cluster_name", help="Cluster name (e.g. 'Lido x Obol: Ethereal Elf')") + parser.add_argument("--network", default="mainnet", help="Network (default: mainnet)") + parser.add_argument("--hours", type=int, default=24, help="Hours to look back (default: 24)") + parser.add_argument("--slot", type=int, default=None, help="Analyze a specific slot") + parser.add_argument("--limit", type=int, default=5000, help="Max log entries per query (default: 5000)") + args = parser.parse_args() + + headers = get_auth_header() + if not headers: + print(json.dumps({"error": "OBOL_GRAFANA_API_TOKEN environment variable is not set"})) + sys.exit(1) + + prom_url, loki_url = discover_datasources(headers) + if not prom_url and not loki_url: + print(json.dumps({"error": "Could not discover Prometheus or Loki datasources from Grafana"})) + sys.exit(1) + + now_ts = int(datetime.now(tz=timezone.utc).timestamp()) + + if args.slot is not None: + # Specific slot analysis + genesis = GENESIS_TIME.get(args.network) + if not genesis: + print(json.dumps({"error": f"Unknown genesis time for network '{args.network}'"})) + sys.exit(1) + + slot_ts = genesis + args.slot * SECONDS_PER_SLOT + slot_time = datetime.fromtimestamp(slot_ts, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + slot_events = [] + if loki_url: + slot_events = query_slot_logs(loki_url, headers, args.cluster_name, args.network, args.slot, args.limit) + + output = { + "mode": "slot_analysis", + "cluster_name": args.cluster_name, + "network": args.network, + "slot": args.slot, + "slot_time": slot_time, + "epoch": args.slot // SLOTS_PER_EPOCH, + "events": slot_events, + "event_count": len(slot_events), + } + + print(json.dumps(output, indent=2)) + return + + # Time range analysis + start_ts = now_ts - args.hours * 3600 + start_ns = start_ts * 1_000_000_000 + end_ns = now_ts * 1_000_000_000 + + print(f"Querying timeouts for cluster '{args.cluster_name}' on {args.network} over last {args.hours}h...", file=sys.stderr) + + timeout_events = [] + round_changes = [] + decided_events = [] + bn_calls = [] + consensus_timeouts = [] + + if loki_url: + print(" Fetching compare timeout events...", file=sys.stderr) + timeout_events = query_timeout_events(loki_url, headers, args.cluster_name, args.network, start_ns, end_ns, args.limit) + print(f" Found {len(timeout_events)} timeout events", file=sys.stderr) + + if timeout_events: + # Only query correlated events if there are timeouts + print(" Fetching round changes...", file=sys.stderr) + round_changes = query_round_changes(loki_url, headers, args.cluster_name, args.network, start_ns, end_ns, args.limit) + print(f" Found {len(round_changes)} round changes", file=sys.stderr) + + print(" Fetching consensus decisions...", file=sys.stderr) + decided_events = query_consensus_decided(loki_url, headers, args.cluster_name, args.network, start_ns, end_ns, args.limit) + print(f" Found {len(decided_events)} decisions", file=sys.stderr) + + print(" Fetching consensus timeouts...", file=sys.stderr) + consensus_timeouts = query_consensus_timeouts(loki_url, headers, args.cluster_name, args.network, start_ns, end_ns, args.limit) + print(f" Found {len(consensus_timeouts)} full consensus timeouts", file=sys.stderr) + + print(" Fetching BN call logs...", file=sys.stderr) + bn_calls = query_bn_call_logs(loki_url, headers, args.cluster_name, args.network, start_ns, end_ns, args.limit) + print(f" Found {len(bn_calls)} BN call events", file=sys.stderr) + + # Prometheus metrics + prom_metrics = {} + if prom_url: + print(" Fetching Prometheus metrics...", file=sys.stderr) + prom_metrics = query_prometheus_metrics(prom_url, headers, args.cluster_name, args.network, start_ts, now_ts) + + # Correlate events + correlations = {} + if timeout_events: + print(" Correlating events...", file=sys.stderr) + correlations = correlate_events(timeout_events, round_changes, decided_events, bn_calls, consensus_timeouts) + + # Build summary + summary = build_summary(timeout_events, correlations, prom_metrics, args.network) + + output = { + "mode": "time_range_analysis", + "cluster_name": args.cluster_name, + "network": args.network, + "time_range": { + "start": datetime.fromtimestamp(start_ts, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "end": datetime.fromtimestamp(now_ts, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "hours": args.hours, + }, + "timeout_events": timeout_events, + "correlations": correlations, + "metrics": prom_metrics, + "summary": summary, + } + + print(json.dumps(output, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/scripts/loki_duplicate_sig_analysis.py b/scripts/loki_duplicate_sig_analysis.py new file mode 100644 index 0000000000..728aa3b392 --- /dev/null +++ b/scripts/loki_duplicate_sig_analysis.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 +""" +Analyze "Ignoring duplicate partial signature" spam across multiple clusters +using Grafana Loki queries. + +Usage: + export $(cat /Users/oisinkyne/code/ObolNetwork/charon/.env | xargs) + python3 scripts/loki_duplicate_sig_analysis.py +""" + +import json +import os +import sys +import urllib.request +import urllib.parse +import urllib.error +from datetime import datetime, timezone + +# --- Configuration --- +GRAFANA_BASE = "https://grafana.monitoring.gcp.obol.tech" +LOKI_PROXY = "/api/datasources/proxy/14/loki/api/v1" +TOKEN = os.environ.get("OBOL_GRAFANA_API_TOKEN", "") + +if not TOKEN: + print("ERROR: OBOL_GRAFANA_API_TOKEN not set. Run:") + print(" export $(cat /Users/oisinkyne/code/ObolNetwork/charon/.env | xargs)") + sys.exit(1) + +CLUSTERS = [ + "Lido x Obol: Azure Albatross", + "Lido x Obol: Arctic Amarok", + "Etherfi-Obol-curated-EU-03", + "Lido x Obol: Bold Banshee", +] + +# Time windows (nanosecond epoch for Loki) +# Inflection analysis: before (07:00-09:00) vs after (09:00-12:00) on 2026-03-09 +BEFORE_START = int(datetime(2026, 3, 9, 7, 0, 0, tzinfo=timezone.utc).timestamp() * 1e9) +BEFORE_END = int(datetime(2026, 3, 9, 9, 0, 0, tzinfo=timezone.utc).timestamp() * 1e9) +AFTER_START = int(datetime(2026, 3, 9, 9, 0, 0, tzinfo=timezone.utc).timestamp() * 1e9) +AFTER_END = int(datetime(2026, 3, 9, 12, 0, 0, tzinfo=timezone.utc).timestamp() * 1e9) + +# Wider window for duty-type breakdown +WIDE_START = int(datetime(2026, 3, 9, 7, 0, 0, tzinfo=timezone.utc).timestamp() * 1e9) +WIDE_END = int(datetime(2026, 3, 9, 12, 0, 0, tzinfo=timezone.utc).timestamp() * 1e9) + + +def loki_query_range(query, start_ns, end_ns, step="60s", limit=5000): + """Execute a Loki query_range and return parsed JSON.""" + params = urllib.parse.urlencode({ + "query": query, + "start": str(start_ns), + "end": str(end_ns), + "step": step, + "limit": str(limit), + }) + url = f"{GRAFANA_BASE}{LOKI_PROXY}/query_range?{params}" + req = urllib.request.Request(url, headers={ + "Authorization": f"Bearer {TOKEN}", + "Accept": "application/json", + }) + try: + with urllib.request.urlopen(req, timeout=60) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + body = e.read().decode() if e.fp else "" + print(f" HTTP {e.code}: {body[:300]}") + return None + except Exception as e: + print(f" Error: {e}") + return None + + +def loki_query(query, start_ns, end_ns, limit=5000): + """Execute a Loki instant-style log query (query_range for logs).""" + params = urllib.parse.urlencode({ + "query": query, + "start": str(start_ns), + "end": str(end_ns), + "limit": str(limit), + "direction": "forward", + }) + url = f"{GRAFANA_BASE}{LOKI_PROXY}/query_range?{params}" + req = urllib.request.Request(url, headers={ + "Authorization": f"Bearer {TOKEN}", + "Accept": "application/json", + }) + try: + with urllib.request.urlopen(req, timeout=120) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + body = e.read().decode() if e.fp else "" + print(f" HTTP {e.code}: {body[:300]}") + return None + except Exception as e: + print(f" Error: {e}") + return None + + +def count_from_metric_result(result): + """Sum up values from a metric query result (rate/count_over_time).""" + if not result or result.get("status") != "success": + return None + data = result.get("data", {}) + results = data.get("result", []) + total = 0 + for series in results: + values = series.get("values", []) + for ts, val in values: + total += float(val) + return total + + +def extract_log_lines(result): + """Extract raw log lines from a streams result.""" + if not result or result.get("status") != "success": + return [] + data = result.get("data", {}) + results = data.get("result", []) + lines = [] + for stream in results: + for ts, line in stream.get("values", []): + lines.append(line) + return lines + + +def parse_ts_from_line(line): + """Extract ts= timestamp from a log line.""" + idx = line.find("ts=") + if idx == -1: + return None + rest = line[idx+3:] + # Find end of timestamp (space or end of string) + end = rest.find(" ") + if end == -1: + end = len(rest) + ts_str = rest[:end].strip('"') + try: + return datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + except: + return None + + +def parse_duty_from_line(line): + """Extract duty type from a log line (e.g., duty=prepare_aggregator).""" + idx = line.find("duty=") + if idx == -1: + # Try looking for duty type in other formats + for dtype in ["prepare_aggregator", "attester", "proposer", "sync_contribution", + "sync_aggregator", "builder_proposer", "prepare_sync_contribution"]: + if dtype in line: + return dtype + return "unknown" + rest = line[idx+5:] + end = rest.find(" ") + if end == -1: + end = len(rest) + return rest[:end].strip('"') + + +# ============================================================ +print("=" * 80) +print("LOKI ANALYSIS: 'Ignoring duplicate partial signature' spam") +print("Inflection point analysis: 2026-03-09 09:00 UTC") +print("=" * 80) + +# --- TASK 1: Rate comparison before vs after inflection point --- +print("\n" + "=" * 80) +print("TASK 1: Rate of duplicate signature messages - Before vs After inflection") +print(" Before: 07:00-09:00 UTC | After: 09:00-12:00 UTC") +print("=" * 80) + +for cluster in CLUSTERS: + print(f"\n--- Cluster: {cluster} ---") + # Use count_over_time with a log filter + # Escape quotes in cluster name for LogQL + escaped = cluster.replace('"', '\\"') + + # Query: count log lines matching the duplicate sig message per 30m buckets + base_filter = f'{{cluster="{escaped}"}} |= "Ignoring duplicate partial signature"' + + # Before period - use count_over_time + q_before = f'count_over_time({base_filter}[2h])' + print(f" Querying BEFORE period...") + res_before = loki_query_range(q_before, BEFORE_START, BEFORE_END, step="7200s") + count_before = count_from_metric_result(res_before) + + # After period + q_after = f'count_over_time({base_filter}[3h])' + print(f" Querying AFTER period...") + res_after = loki_query_range(q_after, AFTER_START, AFTER_END, step="10800s") + count_after = count_from_metric_result(res_after) + + if count_before is not None and count_after is not None: + ratio = count_after / count_before if count_before > 0 else float('inf') + print(f" BEFORE (07-09 UTC): {count_before:.0f} messages (2h window)") + print(f" AFTER (09-12 UTC): {count_after:.0f} messages (3h window)") + before_rate = count_before / 2 if count_before else 0 + after_rate = count_after / 3 if count_after else 0 + print(f" Hourly rate BEFORE: {before_rate:.0f}/hr") + print(f" Hourly rate AFTER: {after_rate:.0f}/hr") + if before_rate > 0: + print(f" Rate change: {((after_rate - before_rate) / before_rate * 100):+.1f}%") + else: + print(f" Rate change: N/A (no messages before)") + else: + print(f" Could not retrieve data. Trying alternative label...") + # Try with cluster_name label instead + base_filter2 = f'{{cluster_name="{escaped}"}} |= "Ignoring duplicate partial signature"' + q_before2 = f'count_over_time({base_filter2}[2h])' + q_after2 = f'count_over_time({base_filter2}[3h])' + res_before2 = loki_query_range(q_before2, BEFORE_START, BEFORE_END, step="7200s") + res_after2 = loki_query_range(q_after2, AFTER_START, AFTER_END, step="10800s") + count_before2 = count_from_metric_result(res_before2) + count_after2 = count_from_metric_result(res_after2) + if count_before2 is not None: + ratio = count_after2 / count_before2 if count_before2 > 0 else float('inf') + print(f" (via cluster_name label)") + print(f" BEFORE (07-09 UTC): {count_before2:.0f} messages") + print(f" AFTER (09-12 UTC): {count_after2:.0f} messages") + else: + print(f" Still no data. Label might differ - check available labels.") + + +# --- TASK 2: During timeout slots, is output dominated by duplicate sig spam? --- +print("\n" + "=" * 80) +print("TASK 2: Log composition during timeout windows") +print(" Checking if duplicate sig spam dominates during timeout periods") +print("=" * 80) + +for cluster in CLUSTERS: + print(f"\n--- Cluster: {cluster} ---") + escaped = cluster.replace('"', '\\"') + + # Get total log volume and duplicate sig volume in the post-inflection window + # Total logs + q_total = f'count_over_time({{cluster="{escaped}"}}[3h])' + q_dup = f'count_over_time({{cluster="{escaped}"}} |= "Ignoring duplicate partial signature"[3h])' + q_timeout = f'count_over_time({{cluster="{escaped}"}} |= "timeout"[3h])' + + print(f" Querying log composition (09:00-12:00 UTC)...") + res_total = loki_query_range(q_total, AFTER_START, AFTER_END, step="10800s") + res_dup = loki_query_range(q_dup, AFTER_START, AFTER_END, step="10800s") + res_timeout = loki_query_range(q_timeout, AFTER_START, AFTER_END, step="10800s") + + total = count_from_metric_result(res_total) + dup = count_from_metric_result(res_dup) + timeout = count_from_metric_result(res_timeout) + + if total is not None and total > 0: + dup_pct = (dup / total * 100) if dup else 0 + timeout_pct = (timeout / total * 100) if timeout else 0 + print(f" Total log lines: {total:.0f}") + print(f" Duplicate sig lines: {dup:.0f} ({dup_pct:.1f}%)") + print(f" Timeout-related lines: {timeout:.0f} ({timeout_pct:.1f}%)") + if dup_pct > 30: + print(f" ** CONFIRMED: Duplicate sig spam dominates log output ({dup_pct:.1f}%)") + elif dup_pct > 10: + print(f" ** NOTABLE: Significant duplicate sig presence ({dup_pct:.1f}%)") + else: + print(f" Duplicate sig messages are not dominant ({dup_pct:.1f}%)") + else: + print(f" No data or zero total. Trying cluster_name label...") + q_total2 = f'count_over_time({{cluster_name="{escaped}"}}[3h])' + q_dup2 = f'count_over_time({{cluster_name="{escaped}"}} |= "Ignoring duplicate partial signature"[3h])' + q_timeout2 = f'count_over_time({{cluster_name="{escaped}"}} |= "timeout"[3h])' + res_total2 = loki_query_range(q_total2, AFTER_START, AFTER_END, step="10800s") + res_dup2 = loki_query_range(q_dup2, AFTER_START, AFTER_END, step="10800s") + res_timeout2 = loki_query_range(q_timeout2, AFTER_START, AFTER_END, step="10800s") + total2 = count_from_metric_result(res_total2) + dup2 = count_from_metric_result(res_dup2) + timeout2 = count_from_metric_result(res_timeout2) + if total2 and total2 > 0: + dup_pct2 = (dup2 / total2 * 100) if dup2 else 0 + timeout_pct2 = (timeout2 / total2 * 100) if timeout2 else 0 + print(f" (via cluster_name label)") + print(f" Total log lines: {total2:.0f}") + print(f" Duplicate sig lines: {dup2:.0f} ({dup_pct2:.1f}%)") + print(f" Timeout-related lines: {timeout2:.0f} ({timeout_pct2:.1f}%)") + else: + print(f" Still no data.") + + +# --- TASK 3: Duty type breakdown of duplicate sig messages --- +print("\n" + "=" * 80) +print("TASK 3: Duty type breakdown of duplicate signature messages") +print(" Checking if prepare_aggregator is specifically affected") +print("=" * 80) + +DUTY_TYPES = [ + "prepare_aggregator", + "attester", + "proposer", + "sync_contribution", + "sync_aggregator", + "builder_proposer", + "prepare_sync_contribution", +] + +for cluster in CLUSTERS: + print(f"\n--- Cluster: {cluster} ---") + escaped = cluster.replace('"', '\\"') + + duty_counts = {} + for duty in DUTY_TYPES: + q = f'count_over_time({{cluster="{escaped}"}} |= "Ignoring duplicate partial signature" |= "{duty}"[5h])' + res = loki_query_range(q, WIDE_START, WIDE_END, step="18000s") + cnt = count_from_metric_result(res) + if cnt is None: + # Try cluster_name + q2 = f'count_over_time({{cluster_name="{escaped}"}} |= "Ignoring duplicate partial signature" |= "{duty}"[5h])' + res2 = loki_query_range(q2, WIDE_START, WIDE_END, step="18000s") + cnt = count_from_metric_result(res2) + duty_counts[duty] = cnt if cnt is not None else 0 + + total_typed = sum(duty_counts.values()) + print(f" Duty type breakdown (07:00-12:00 UTC):") + for duty, cnt in sorted(duty_counts.items(), key=lambda x: -x[1]): + pct = (cnt / total_typed * 100) if total_typed > 0 else 0 + bar = "#" * int(pct / 2) + print(f" {duty:<30s}: {cnt:>8.0f} ({pct:5.1f}%) {bar}") + + if total_typed > 0: + top_duty = max(duty_counts, key=duty_counts.get) + top_pct = duty_counts[top_duty] / total_typed * 100 + if top_duty == "prepare_aggregator" and top_pct > 50: + print(f" ** CONFIRMED: prepare_aggregator dominates ({top_pct:.1f}%)") + elif top_pct > 50: + print(f" ** Top duty type: {top_duty} ({top_pct:.1f}%)") + else: + print(f" ** Multiple duty types affected - no single dominant type") + else: + print(f" No duty-typed duplicate sig messages found.") + + +# --- TASK 3b: Sample actual log lines to verify duty extraction --- +print("\n" + "=" * 80) +print("TASK 3b: Sample log lines for duty type verification") +print("=" * 80) + +# Just sample from one cluster +sample_cluster = CLUSTERS[0] +escaped = sample_cluster.replace('"', '\\"') +q_sample = f'{{cluster="{escaped}"}} |= "Ignoring duplicate partial signature"' +print(f" Sampling from: {sample_cluster}") +res_sample = loki_query(q_sample, AFTER_START, AFTER_END, limit=20) +lines = extract_log_lines(res_sample) +if not lines: + q_sample2 = f'{{cluster_name="{escaped}"}} |= "Ignoring duplicate partial signature"' + res_sample2 = loki_query(q_sample2, AFTER_START, AFTER_END, limit=20) + lines = extract_log_lines(res_sample2) + +if lines: + print(f" Got {len(lines)} sample lines. Parsing duty types and timestamps:") + duty_dist = {} + for line in lines[:20]: + ts = parse_ts_from_line(line) + duty = parse_duty_from_line(line) + duty_dist[duty] = duty_dist.get(duty, 0) + 1 + ts_str = ts.strftime("%H:%M:%S") if ts else "?" + # Print truncated line + print(f" [{ts_str}] duty={duty} | {line[:120]}...") + print(f"\n Duty distribution in sample:") + for d, c in sorted(duty_dist.items(), key=lambda x: -x[1]): + print(f" {d}: {c}") +else: + print(f" No sample lines retrieved.") + + +print("\n" + "=" * 80) +print("ANALYSIS COMPLETE") +print("=" * 80)