-
Notifications
You must be signed in to change notification settings - Fork 15
Fix/backport prod fixes #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,6 +90,23 @@ pub async fn announce( | |
| )); | ||
| } | ||
|
|
||
| // Reject self-announcements: a peer row whose http_url is our own public | ||
| // URL makes the HTTP-notify path fan out to ourselves. Seen in prod when | ||
| // misconfigured dev nodes announce with their upstream's URL. | ||
| // prune_self_peers clears stale rows at boot; this stops new ones. | ||
| if let Some(self_url) = state.config.public_url.as_deref() { | ||
| if req.http_url.trim_end_matches('/') == self_url.trim_end_matches('/') { | ||
| return Err(AppError::BadRequest( | ||
| "http_url is this node's own public URL; refusing to register self as peer".into(), | ||
| )); | ||
| } | ||
| } | ||
| if announced_did.to_string() == state.node_did.to_string() { | ||
| return Err(AppError::BadRequest( | ||
| "did is this node's own DID; refusing to register self as peer".into(), | ||
| )); | ||
| } | ||
|
|
||
| state.db.upsert_peer(&req.did, &req.http_url).await?; | ||
|
|
||
| tracing::info!(did = %req.did, url = %req.http_url, "peer announced"); | ||
|
|
@@ -124,16 +141,30 @@ pub async fn trigger_sync(State(state): State<AppState>) -> Result<Json<serde_js | |
| continue; | ||
| } | ||
| let url = format!("{}/api/v1/repos", peer.http_url.trim_end_matches('/')); | ||
| let result = | ||
| tokio::time::timeout(std::time::Duration::from_secs(5), client.get(&url).send()).await; | ||
| // 30s with the body read inside the timeout: 5s only covered the | ||
| // response headers, so canonical nodes serving large unpaginated repo | ||
| // lists (and transpacific round trips) aborted mid-body. | ||
| let result = tokio::time::timeout(std::time::Duration::from_secs(30), async { | ||
| let resp = client.get(&url).send().await?; | ||
| if !resp.status().is_success() { | ||
| return Err(anyhow::anyhow!("peer returned status {}", resp.status())); | ||
| } | ||
| let repos: Vec<serde_json::Value> = resp.json().await?; | ||
| Ok::<_, anyhow::Error>(repos) | ||
| }) | ||
| .await; | ||
|
|
||
| let repos: Vec<serde_json::Value> = match result { | ||
| Ok(Ok(resp)) if resp.status().is_success() => { | ||
| let repos = match result { | ||
| Ok(Ok(repos)) => { | ||
| peers_reached += 1; | ||
| resp.json().await.unwrap_or_default() | ||
| repos | ||
| } | ||
| Ok(Err(e)) => { | ||
| tracing::warn!(peer = %peer.did, err = %e, "trigger_sync: peer fetch failed"); | ||
| continue; | ||
| } | ||
| _ => { | ||
| tracing::warn!(peer = %peer.did, "trigger_sync: could not reach peer"); | ||
| Err(_) => { | ||
| tracing::warn!(peer = %peer.did, "trigger_sync: peer timed out"); | ||
| continue; | ||
| } | ||
| }; | ||
|
|
@@ -183,6 +214,18 @@ pub struct NotifyRequest { | |
| pub ref_name: String, | ||
| pub new_sha: String, | ||
| pub node_did: String, | ||
| // Optional fields — older senders only included the four above. New | ||
| // senders include these so received_ref_updates has full provenance | ||
| // even when the libp2p mesh isn't delivering and the HTTP fallback | ||
| // is the only path that fired. | ||
| #[serde(default)] | ||
| pub pusher_did: Option<String>, | ||
| #[serde(default)] | ||
| pub old_sha: Option<String>, | ||
| #[serde(default)] | ||
| pub timestamp: Option<String>, | ||
| #[serde(default)] | ||
| pub cert_id: Option<String>, | ||
| } | ||
|
|
||
| pub async fn notify_sync( | ||
|
|
@@ -218,6 +261,27 @@ pub async fn notify_sync( | |
| .enqueue_sync(&req.repo, &req.node_did, &req.ref_name, &req.new_sha, None) | ||
| .await?; | ||
|
|
||
| // Mirror the gossipsub-receive handler: insert the same record we'd | ||
| // get from the libp2p path, so /api/v1/events/ref-updates reflects | ||
| // pushes that arrive over either transport. | ||
| let now = chrono::Utc::now().to_rfc3339(); | ||
| let update = crate::db::ReceivedRefUpdate { | ||
| id: uuid::Uuid::new_v4().to_string(), | ||
| node_did: req.node_did.clone(), | ||
| pusher_did: req.pusher_did.clone().unwrap_or_default(), | ||
| repo: req.repo.clone(), | ||
| ref_name: req.ref_name.clone(), | ||
| old_sha: req.old_sha.clone().unwrap_or_default(), | ||
| new_sha: req.new_sha.clone(), | ||
| timestamp: req.timestamp.clone().unwrap_or_else(|| now.clone()), | ||
| cert_id: req.cert_id.clone(), | ||
| received_at: now, | ||
| from_peer: format!("http:{}", req.node_did), | ||
| }; | ||
| if let Err(e) = state.db.insert_ref_update(&update).await { | ||
| tracing::warn!(err = %e, repo = %req.repo, "failed to insert ref-update from sync notify"); | ||
| } | ||
|
Comment on lines
+264
to
+283
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
# Find push fanout code that emits ref updates.
rg -n --type rust -C4 'publish_ref_update\s*\(|sync/notify|insert_ref_update\s*\(' crates
# Inspect readers of received_ref_updates to see whether they dedupe by content/source.
rg -n --type rust -C4 'received_ref_updates|from_peer' crates/gitlawb-node/srcRepository: Gitlawb/node Length of output: 18423 Prevent duplicate
🤖 Prompt for AI Agents |
||
|
|
||
| tracing::info!( | ||
| repo = %req.repo, | ||
| peer = %req.node_did, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: Gitlawb/node
Length of output: 13556
Timeout diagnostics split likely masked by shared reqwest timeout
crates/gitlawb-node/src/main.rsconstructsstate.http_clientwithreqwest::Client::builder().timeout(Duration::from_secs(10)), which will typically makeclient.get(...).send().await?fail before the outertokio::time::timeout(30s, ...)can fire—so slow peers will be logged aspeer fetch failed(Ok(Err(_))) rather thanpeer timed out(Err(_)). There’s also an alternateAppStatepath incrates/gitlawb-node/src/auth/mod.rsthat usesreqwest::Client::new()(no explicit timeout), so the outer timeout branch may only be reachable when that state is used fortrigger_sync.🤖 Prompt for AI Agents