diff --git a/crates/codegraph-graph/src/surrealdb_storage.rs b/crates/codegraph-graph/src/surrealdb_storage.rs index a92909f..2b73d1a 100644 --- a/crates/codegraph-graph/src/surrealdb_storage.rs +++ b/crates/codegraph-graph/src/surrealdb_storage.rs @@ -604,6 +604,56 @@ impl SurrealDbStorage { Ok(()) } + /// Resilient edge upsert for large codebases. + /// + /// Pre-chunks the input into slices of [`EDGE_BATCH_CHUNK_SIZE`] before + /// sending to SurrealDB. If a chunk still fails with a WebSocket + /// connection-reset or an "excessive computation depth" error (both caused + /// by SurrealDB's `FOR … IN $batch` loop hitting internal limits), the + /// chunk is split in half and retried up to 3 times. This mirrors the + /// existing `upsert_chunk_embeddings_resilient` strategy. + /// + /// Use this instead of `upsert_edges_batch` whenever the total number of + /// edges is not tightly bounded (e.g. during full project indexing). + pub async fn upsert_edges_batch_resilient(&mut self, edges: &[CodeEdge]) -> Result<()> { + if edges.is_empty() { + return Ok(()); + } + + // Iterative queue: (batch_slice, retries_remaining) + let mut queue: Vec<(Vec, u8)> = edges + .chunks(EDGE_BATCH_CHUNK_SIZE) + .map(|c| (c.to_vec(), 3u8)) + .collect(); + + while let Some((batch, remaining)) = queue.pop() { + if batch.is_empty() { + continue; + } + match self.upsert_edges_batch(&batch).await { + Ok(()) => {} + Err(err) => { + let msg = err.to_string(); + let msg_lower = msg.to_lowercase(); + let recoverable = msg_lower.contains("excessive computation depth") + || msg_lower.contains("computationdepth") + || msg_lower.contains("connection reset") + || msg_lower.contains("broken pipe"); + if recoverable && remaining > 0 && batch.len() > 1 { + let mid = batch.len() / 2; + let (left, right) = batch.split_at(mid); + queue.push((right.to_vec(), remaining - 1)); + queue.push((left.to_vec(), remaining - 1)); + } else { + return Err(err); + } + } + } + } + + Ok(()) + } + pub async fn upsert_symbol_embeddings_batch( &self, records: &[SymbolEmbeddingRecord], @@ -683,10 +733,12 @@ impl SurrealDbStorage { // First, try a bulk INSERT (shallow query, no FOR loop) if let Err(err) = self.insert_chunk_embeddings_batch(records).await { let msg = err.to_string(); - let duplicate = msg.to_lowercase().contains("duplicate"); - let depth_hit = msg.contains("excessive computation depth") - || msg.contains("ComputationDepth") - || msg.contains("connection reset"); + let msg_lower = msg.to_lowercase(); + let duplicate = msg_lower.contains("duplicate"); + let depth_hit = msg_lower.contains("excessive computation depth") + || msg_lower.contains("computationdepth") + || msg_lower.contains("connection reset") + || msg_lower.contains("broken pipe"); // Only fall through to upsert/backoff on duplicate or depth issues; otherwise fail fast if !duplicate && !depth_hit { @@ -708,9 +760,11 @@ impl SurrealDbStorage { Ok(()) => {} Err(err) => { let msg = err.to_string(); - let depth_hit = msg.contains("excessive computation depth") - || msg.contains("ComputationDepth") - || msg.contains("connection reset"); + let msg_lower = msg.to_lowercase(); + let depth_hit = msg_lower.contains("excessive computation depth") + || msg_lower.contains("computationdepth") + || msg_lower.contains("connection reset") + || msg_lower.contains("broken pipe"); if depth_hit && remaining > 0 && batch.len() > 1 { let mid = batch.len() / 2; @@ -919,7 +973,7 @@ impl SurrealDbStorage { } pub async fn add_code_edges(&mut self, edges: Vec) -> Result<()> { - self.upsert_edges_batch(&edges).await + self.upsert_edges_batch_resilient(&edges).await } pub async fn upsert_symbol_embedding(&self, record: SymbolEmbeddingUpsert<'_>) -> Result<()> { @@ -2148,6 +2202,12 @@ FOR $doc IN $batch { } "#; +/// Maximum number of edges sent in a single SurrealDB `FOR … IN $batch` query. +/// Larger batches cause WebSocket connection resets or "excessive computation +/// depth" errors in SurrealDB. `upsert_edges_batch_resilient` pre-chunks to +/// this size and halves further on transient errors. +const EDGE_BATCH_CHUNK_SIZE: usize = 2_000; + const UPSERT_EDGES_QUERY: &str = r#" LET $batch = $data; FOR $doc IN $batch { diff --git a/crates/codegraph-mcp/src/indexer.rs b/crates/codegraph-mcp/src/indexer.rs index e30167b..bd7e74d 100644 --- a/crates/codegraph-mcp/src/indexer.rs +++ b/crates/codegraph-mcp/src/indexer.rs @@ -281,7 +281,7 @@ impl SurrealWriterHandle { } if let Err(err) = { let mut guard = storage.lock().await; - guard.upsert_edges_batch(&edges).await + guard.upsert_edges_batch_resilient(&edges).await } { error!("Surreal edge batch failed: {}", err); last_error = Some(anyhow!(err.to_string()));