Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/skillspector/llm_analyzer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,14 @@ async def arun_batches(
*max_concurrency* LLM requests in parallel. Both cross-file and
cross-chunk batches are parallelized in a single gather call.

Failures are isolated per batch: a transient error (timeout, 429,
oversized-chunk 400, ...) costs only its own batch, which is logged
and omitted from the result, so one bad call cannot cancel the rest
of the fan-out. Callers can detect partial results by comparing the
returned batches against the submitted ones. ``ValueError`` and
``NotImplementedError`` signal misconfiguration rather than infra
trouble and keep propagating.

The return type mirrors :meth:`run_batches`.
"""
sem = asyncio.Semaphore(max_concurrency)
Expand All @@ -394,7 +402,16 @@ async def _process(batch: Batch) -> tuple[Batch, list]:
logger.debug("LLM response for %s", batch.file_label)
return (batch, self.parse_response(response, batch))

return list(await asyncio.gather(*[_process(b) for b in batches]))
results = await asyncio.gather(*[_process(b) for b in batches], return_exceptions=True)
successful: list[tuple[Batch, list]] = []
for batch, result in zip(batches, results, strict=True):
if isinstance(result, (ValueError, NotImplementedError)):
raise result
if isinstance(result, BaseException):
logger.warning("LLM batch failed for %s: %s", batch.file_label, result)
continue
successful.append(result)
return successful

# -- Convenience --------------------------------------------------------

Expand Down
23 changes: 22 additions & 1 deletion src/skillspector/nodes/meta_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,28 @@ def meta_analyzer(state: SkillspectorState) -> MetaAnalyzerResponse:
)

batch_results = asyncio.run(analyzer.arun_batches(batches, metadata_text=metadata_text))
filtered = analyzer.apply_filter(findings, batch_results)

if len(batch_results) < len(batches):
# Some batches never returned. A finding the LLM never saw has no
# verdict — keep it via the fallback path instead of letting
# apply_filter treat the missing confirmation as a rejection.
analysed_ids = {id(f) for batch, _ in batch_results for f in batch.findings}
analysed = [f for f in findings if id(f) in analysed_ids]
unanalysed = [f for f in findings if id(f) not in analysed_ids]
else:
analysed, unanalysed = findings, []

filtered = analyzer.apply_filter(analysed, batch_results)
if unanalysed:
logger.warning(
"Meta-analyzer: %d/%d batches failed; keeping %d findings in %d "
"files unfiltered (no LLM verdict)",
len(batches) - len(batch_results),
len(batches),
len(unanalysed),
len({f.file for f in unanalysed}),
)
filtered.extend(_fallback_filtered(unanalysed))

logger.debug(
"LLM filtering done: %d findings -> %d after filter",
Expand Down
36 changes: 36 additions & 0 deletions tests/nodes/test_llm_analyzer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,42 @@ async def _delayed_ainvoke(prompt: str) -> LLMAnalysisResult:

assert seen_files == {f"file_{i}.py" for i in range(num_batches)}

@patch(MOCK_PATCH_TARGET, _mock_get_chat_model)
async def test_failed_batch_does_not_abort_the_others(self) -> None:
"""A transient failure costs only its own batch, not the whole fan-out."""

async def _flaky_ainvoke(prompt: str) -> LLMAnalysisResult:
if "b.py" in prompt:
raise RuntimeError("429 Too Many Requests")
return LLMAnalysisResult(findings=[])

analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL)
analyzer._structured_llm.ainvoke = _flaky_ainvoke

batches = [
Batch(file_path="a.py", content="code a"),
Batch(file_path="b.py", content="code b"),
Batch(file_path="c.py", content="code c"),
]
results = await analyzer.arun_batches(batches)
assert {batch.file_path for batch, _ in results} == {"a.py", "c.py"}

@patch(MOCK_PATCH_TARGET, _mock_get_chat_model)
async def test_all_batches_failed_returns_empty(self) -> None:
analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL)
analyzer._structured_llm.ainvoke = AsyncMock(side_effect=RuntimeError("boom"))
batches = [Batch(file_path="a.py", content="code")]
assert await analyzer.arun_batches(batches) == []

@patch(MOCK_PATCH_TARGET, _mock_get_chat_model)
async def test_value_error_still_propagates(self) -> None:
"""ValueError signals misconfiguration, not infra trouble — never swallowed."""
analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL)
analyzer._structured_llm.ainvoke = AsyncMock(side_effect=ValueError("no API key"))
batches = [Batch(file_path="a.py", content="code")]
with pytest.raises(ValueError, match="no API key"):
await analyzer.arun_batches(batches)


# ---------------------------------------------------------------------------
# _format_findings_for_prompt (per-file, no truncation)
Expand Down
148 changes: 148 additions & 0 deletions tests/nodes/test_meta_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for the meta_analyzer node: batch-failure resilience of the filter.

Covers the keep/drop semantics when Stage 2 batches fail (issues #9/#11):
a finding the LLM rejected is dropped, a finding the LLM never saw is kept
via the fallback path, and one failed batch must not disable filtering for
the batches that succeeded.
"""

from __future__ import annotations

from unittest.mock import AsyncMock, patch

from skillspector.llm_analyzer_base import Batch
from skillspector.models import Finding
from skillspector.nodes.meta_analyzer import LLMMetaAnalyzer, meta_analyzer

MOCK_PATCH_TARGET = "skillspector.llm_analyzer_base.get_chat_model"


def _mock_get_chat_model(*_args, **_kwargs):
from unittest.mock import MagicMock

mock_llm = MagicMock()
mock_llm.with_structured_output.return_value = MagicMock()
return mock_llm


def _confirm(pattern_id: str, file: str, start_line: int) -> dict[str, object]:
"""LLM item confirming a finding, as parse_response would emit it."""
return {
"pattern_id": pattern_id,
"is_vulnerability": True,
"confidence": 0.9,
"explanation": "confirmed by llm",
"remediation": "fix it",
"_file": file,
"start_line": start_line,
"end_line": None,
}


@patch(MOCK_PATCH_TARGET, _mock_get_chat_model)
class TestMetaAnalyzerPartialBatchFailure:
def _state(self, findings: list[Finding]) -> dict[str, object]:
return {
"findings": findings,
"use_llm": True,
"file_cache": {"a.py": "code a", "b.py": "code b"},
"manifest": {},
"model_config": {},
}

def test_unanalysed_findings_survive_a_failed_batch(self) -> None:
"""Findings whose batch failed are kept (no verdict != rejection)."""
f_confirmed = Finding(rule_id="R1", message="m", file="a.py", start_line=1)
f_rejected = Finding(rule_id="R2", message="m", file="a.py", start_line=5)
f_unseen = Finding(rule_id="R1", message="m", file="b.py", start_line=3)

batch_a = Batch(file_path="a.py", content="code a", findings=[f_confirmed, f_rejected])
batch_b = Batch(file_path="b.py", content="code b", findings=[f_unseen])

# batch_b never returned (timeout/429): only batch_a's verdicts exist,
# and the LLM confirmed R1 but stayed silent on R2 (= rejection).
partial_results = [(batch_a, [_confirm("R1", "a.py", 1)])]

with (
patch.object(LLMMetaAnalyzer, "get_batches", return_value=[batch_a, batch_b]),
patch.object(
LLMMetaAnalyzer,
"arun_batches",
new_callable=AsyncMock,
return_value=partial_results,
),
):
result = meta_analyzer(self._state([f_confirmed, f_rejected, f_unseen]))

filtered = result["filtered_findings"]
kept = {(f.file, f.rule_id) for f in filtered}

# the real filter still applies to the batch that came back
assert ("a.py", "R1") in kept
assert ("a.py", "R2") not in kept
# the finding the LLM never saw must NOT be silently dropped
assert ("b.py", "R1") in kept

confirmed = next(f for f in filtered if f.file == "a.py")
assert confirmed.explanation == "confirmed by llm"

def test_all_batches_failed_keeps_everything_via_fallback(self) -> None:
f1 = Finding(rule_id="R1", message="m", file="a.py", start_line=1)
f2 = Finding(rule_id="R2", message="m", file="b.py", start_line=2)
batch_a = Batch(file_path="a.py", content="code a", findings=[f1])
batch_b = Batch(file_path="b.py", content="code b", findings=[f2])

with (
patch.object(LLMMetaAnalyzer, "get_batches", return_value=[batch_a, batch_b]),
patch.object(
LLMMetaAnalyzer,
"arun_batches",
new_callable=AsyncMock,
return_value=[],
),
):
result = meta_analyzer(self._state([f1, f2]))

kept = {(f.file, f.rule_id) for f in result["filtered_findings"]}
assert kept == {("a.py", "R1"), ("b.py", "R2")}

def test_no_failures_keeps_strict_confirm_or_drop(self) -> None:
"""When every batch returns, unconfirmed findings are dropped as before."""
f_confirmed = Finding(rule_id="R1", message="m", file="a.py", start_line=1)
f_rejected = Finding(rule_id="R2", message="m", file="b.py", start_line=2)
batch_a = Batch(file_path="a.py", content="code a", findings=[f_confirmed])
batch_b = Batch(file_path="b.py", content="code b", findings=[f_rejected])

full_results = [
(batch_a, [_confirm("R1", "a.py", 1)]),
(batch_b, []),
]

with (
patch.object(LLMMetaAnalyzer, "get_batches", return_value=[batch_a, batch_b]),
patch.object(
LLMMetaAnalyzer,
"arun_batches",
new_callable=AsyncMock,
return_value=full_results,
),
):
result = meta_analyzer(self._state([f_confirmed, f_rejected]))

kept = {(f.file, f.rule_id) for f in result["filtered_findings"]}
assert kept == {("a.py", "R1")}