From d09497d3d7f69db521b33e11c0ddc0d6f74dcd19 Mon Sep 17 00:00:00 2001 From: nyxst4ck <289980115+nyxst4ck@users.noreply.github.com> Date: Fri, 12 Jun 2026 09:21:29 -0300 Subject: [PATCH] fix(llm): isolate batch failures in Stage 2 and keep unanalysed findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit One exception anywhere in the arun_batches fan-out aborted the whole Stage 2 pass: asyncio.gather without return_exceptions cancelled the remaining batches, the meta-analyzer's blanket except caught the propagated error, and every file silently fell back to static-only results while the CLI still exited 0 (#9). arun_batches now isolates failures per batch: a transient error (timeout, 429, oversized-chunk 400) is logged and costs only its own batch. ValueError and NotImplementedError still propagate, since they signal misconfiguration rather than infra trouble. With partial results possible, apply_filter could no longer treat a missing confirmation as a rejection: a finding whose batch never returned would be silently dropped — a false negative manufactured by an infrastructure error (#11). The meta-analyzer now partitions findings by whether a returned batch actually carried them: analysed findings go through the normal confirm-or-drop filter, unanalysed ones are kept via the existing fallback path, and a WARNING logs how many findings were kept unfiltered so the gap is visible. Net effect: an infra failure can only ever cost enrichment on a file, never the finding itself, and one bad call no longer turns off the semantic filter for the whole scan. Fixes #9, fixes #11 Co-Authored-By: Claude Fable 5 Signed-off-by: nyxst4ck <289980115+nyxst4ck@users.noreply.github.com> --- src/skillspector/llm_analyzer_base.py | 19 ++- src/skillspector/nodes/meta_analyzer.py | 23 +++- tests/nodes/test_llm_analyzer_base.py | 36 ++++++ tests/nodes/test_meta_analyzer.py | 148 ++++++++++++++++++++++++ 4 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 tests/nodes/test_meta_analyzer.py diff --git a/src/skillspector/llm_analyzer_base.py b/src/skillspector/llm_analyzer_base.py index a678e4e..f5933ff 100644 --- a/src/skillspector/llm_analyzer_base.py +++ b/src/skillspector/llm_analyzer_base.py @@ -374,6 +374,14 @@ async def arun_batches( *max_concurrency* LLM requests in parallel. Both cross-file and cross-chunk batches are parallelized in a single gather call. + Failures are isolated per batch: a transient error (timeout, 429, + oversized-chunk 400, ...) costs only its own batch, which is logged + and omitted from the result, so one bad call cannot cancel the rest + of the fan-out. Callers can detect partial results by comparing the + returned batches against the submitted ones. ``ValueError`` and + ``NotImplementedError`` signal misconfiguration rather than infra + trouble and keep propagating. + The return type mirrors :meth:`run_batches`. """ sem = asyncio.Semaphore(max_concurrency) @@ -394,7 +402,16 @@ async def _process(batch: Batch) -> tuple[Batch, list]: logger.debug("LLM response for %s", batch.file_label) return (batch, self.parse_response(response, batch)) - return list(await asyncio.gather(*[_process(b) for b in batches])) + results = await asyncio.gather(*[_process(b) for b in batches], return_exceptions=True) + successful: list[tuple[Batch, list]] = [] + for batch, result in zip(batches, results, strict=True): + if isinstance(result, (ValueError, NotImplementedError)): + raise result + if isinstance(result, BaseException): + logger.warning("LLM batch failed for %s: %s", batch.file_label, result) + continue + successful.append(result) + return successful # -- Convenience -------------------------------------------------------- diff --git a/src/skillspector/nodes/meta_analyzer.py b/src/skillspector/nodes/meta_analyzer.py index 8f2b541..74f8c99 100644 --- a/src/skillspector/nodes/meta_analyzer.py +++ b/src/skillspector/nodes/meta_analyzer.py @@ -392,7 +392,28 @@ def meta_analyzer(state: SkillspectorState) -> MetaAnalyzerResponse: ) batch_results = asyncio.run(analyzer.arun_batches(batches, metadata_text=metadata_text)) - filtered = analyzer.apply_filter(findings, batch_results) + + if len(batch_results) < len(batches): + # Some batches never returned. A finding the LLM never saw has no + # verdict — keep it via the fallback path instead of letting + # apply_filter treat the missing confirmation as a rejection. + analysed_ids = {id(f) for batch, _ in batch_results for f in batch.findings} + analysed = [f for f in findings if id(f) in analysed_ids] + unanalysed = [f for f in findings if id(f) not in analysed_ids] + else: + analysed, unanalysed = findings, [] + + filtered = analyzer.apply_filter(analysed, batch_results) + if unanalysed: + logger.warning( + "Meta-analyzer: %d/%d batches failed; keeping %d findings in %d " + "files unfiltered (no LLM verdict)", + len(batches) - len(batch_results), + len(batches), + len(unanalysed), + len({f.file for f in unanalysed}), + ) + filtered.extend(_fallback_filtered(unanalysed)) logger.debug( "LLM filtering done: %d findings -> %d after filter", diff --git a/tests/nodes/test_llm_analyzer_base.py b/tests/nodes/test_llm_analyzer_base.py index 9899a7f..4125b11 100644 --- a/tests/nodes/test_llm_analyzer_base.py +++ b/tests/nodes/test_llm_analyzer_base.py @@ -484,6 +484,42 @@ async def _delayed_ainvoke(prompt: str) -> LLMAnalysisResult: assert seen_files == {f"file_{i}.py" for i in range(num_batches)} + @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) + async def test_failed_batch_does_not_abort_the_others(self) -> None: + """A transient failure costs only its own batch, not the whole fan-out.""" + + async def _flaky_ainvoke(prompt: str) -> LLMAnalysisResult: + if "b.py" in prompt: + raise RuntimeError("429 Too Many Requests") + return LLMAnalysisResult(findings=[]) + + analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL) + analyzer._structured_llm.ainvoke = _flaky_ainvoke + + batches = [ + Batch(file_path="a.py", content="code a"), + Batch(file_path="b.py", content="code b"), + Batch(file_path="c.py", content="code c"), + ] + results = await analyzer.arun_batches(batches) + assert {batch.file_path for batch, _ in results} == {"a.py", "c.py"} + + @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) + async def test_all_batches_failed_returns_empty(self) -> None: + analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL) + analyzer._structured_llm.ainvoke = AsyncMock(side_effect=RuntimeError("boom")) + batches = [Batch(file_path="a.py", content="code")] + assert await analyzer.arun_batches(batches) == [] + + @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) + async def test_value_error_still_propagates(self) -> None: + """ValueError signals misconfiguration, not infra trouble — never swallowed.""" + analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL) + analyzer._structured_llm.ainvoke = AsyncMock(side_effect=ValueError("no API key")) + batches = [Batch(file_path="a.py", content="code")] + with pytest.raises(ValueError, match="no API key"): + await analyzer.arun_batches(batches) + # --------------------------------------------------------------------------- # _format_findings_for_prompt (per-file, no truncation) diff --git a/tests/nodes/test_meta_analyzer.py b/tests/nodes/test_meta_analyzer.py new file mode 100644 index 0000000..e7a3c59 --- /dev/null +++ b/tests/nodes/test_meta_analyzer.py @@ -0,0 +1,148 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the meta_analyzer node: batch-failure resilience of the filter. + +Covers the keep/drop semantics when Stage 2 batches fail (issues #9/#11): +a finding the LLM rejected is dropped, a finding the LLM never saw is kept +via the fallback path, and one failed batch must not disable filtering for +the batches that succeeded. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, patch + +from skillspector.llm_analyzer_base import Batch +from skillspector.models import Finding +from skillspector.nodes.meta_analyzer import LLMMetaAnalyzer, meta_analyzer + +MOCK_PATCH_TARGET = "skillspector.llm_analyzer_base.get_chat_model" + + +def _mock_get_chat_model(*_args, **_kwargs): + from unittest.mock import MagicMock + + mock_llm = MagicMock() + mock_llm.with_structured_output.return_value = MagicMock() + return mock_llm + + +def _confirm(pattern_id: str, file: str, start_line: int) -> dict[str, object]: + """LLM item confirming a finding, as parse_response would emit it.""" + return { + "pattern_id": pattern_id, + "is_vulnerability": True, + "confidence": 0.9, + "explanation": "confirmed by llm", + "remediation": "fix it", + "_file": file, + "start_line": start_line, + "end_line": None, + } + + +@patch(MOCK_PATCH_TARGET, _mock_get_chat_model) +class TestMetaAnalyzerPartialBatchFailure: + def _state(self, findings: list[Finding]) -> dict[str, object]: + return { + "findings": findings, + "use_llm": True, + "file_cache": {"a.py": "code a", "b.py": "code b"}, + "manifest": {}, + "model_config": {}, + } + + def test_unanalysed_findings_survive_a_failed_batch(self) -> None: + """Findings whose batch failed are kept (no verdict != rejection).""" + f_confirmed = Finding(rule_id="R1", message="m", file="a.py", start_line=1) + f_rejected = Finding(rule_id="R2", message="m", file="a.py", start_line=5) + f_unseen = Finding(rule_id="R1", message="m", file="b.py", start_line=3) + + batch_a = Batch(file_path="a.py", content="code a", findings=[f_confirmed, f_rejected]) + batch_b = Batch(file_path="b.py", content="code b", findings=[f_unseen]) + + # batch_b never returned (timeout/429): only batch_a's verdicts exist, + # and the LLM confirmed R1 but stayed silent on R2 (= rejection). + partial_results = [(batch_a, [_confirm("R1", "a.py", 1)])] + + with ( + patch.object(LLMMetaAnalyzer, "get_batches", return_value=[batch_a, batch_b]), + patch.object( + LLMMetaAnalyzer, + "arun_batches", + new_callable=AsyncMock, + return_value=partial_results, + ), + ): + result = meta_analyzer(self._state([f_confirmed, f_rejected, f_unseen])) + + filtered = result["filtered_findings"] + kept = {(f.file, f.rule_id) for f in filtered} + + # the real filter still applies to the batch that came back + assert ("a.py", "R1") in kept + assert ("a.py", "R2") not in kept + # the finding the LLM never saw must NOT be silently dropped + assert ("b.py", "R1") in kept + + confirmed = next(f for f in filtered if f.file == "a.py") + assert confirmed.explanation == "confirmed by llm" + + def test_all_batches_failed_keeps_everything_via_fallback(self) -> None: + f1 = Finding(rule_id="R1", message="m", file="a.py", start_line=1) + f2 = Finding(rule_id="R2", message="m", file="b.py", start_line=2) + batch_a = Batch(file_path="a.py", content="code a", findings=[f1]) + batch_b = Batch(file_path="b.py", content="code b", findings=[f2]) + + with ( + patch.object(LLMMetaAnalyzer, "get_batches", return_value=[batch_a, batch_b]), + patch.object( + LLMMetaAnalyzer, + "arun_batches", + new_callable=AsyncMock, + return_value=[], + ), + ): + result = meta_analyzer(self._state([f1, f2])) + + kept = {(f.file, f.rule_id) for f in result["filtered_findings"]} + assert kept == {("a.py", "R1"), ("b.py", "R2")} + + def test_no_failures_keeps_strict_confirm_or_drop(self) -> None: + """When every batch returns, unconfirmed findings are dropped as before.""" + f_confirmed = Finding(rule_id="R1", message="m", file="a.py", start_line=1) + f_rejected = Finding(rule_id="R2", message="m", file="b.py", start_line=2) + batch_a = Batch(file_path="a.py", content="code a", findings=[f_confirmed]) + batch_b = Batch(file_path="b.py", content="code b", findings=[f_rejected]) + + full_results = [ + (batch_a, [_confirm("R1", "a.py", 1)]), + (batch_b, []), + ] + + with ( + patch.object(LLMMetaAnalyzer, "get_batches", return_value=[batch_a, batch_b]), + patch.object( + LLMMetaAnalyzer, + "arun_batches", + new_callable=AsyncMock, + return_value=full_results, + ), + ): + result = meta_analyzer(self._state([f_confirmed, f_rejected])) + + kept = {(f.file, f.rule_id) for f in result["filtered_findings"]} + assert kept == {("a.py", "R1")}