diff --git a/docs/wiki/08-24-Big-O-Detection.md b/docs/wiki/08-24-Big-O-Detection.md new file mode 100644 index 0000000..1ae970e --- /dev/null +++ b/docs/wiki/08-24-Big-O-Detection.md @@ -0,0 +1,150 @@ +# Algorithmic DoS & Big-O Detection + +> **Metric: Algorithmic DoS & Data Gravity** +> +> **TL;DR:** Performance is no longer just a latency issue; it is a critical security perimeter. A standalone $O(N^3)$ loop calculating math offline is harmless. But that exact same $O(N^3)$ loop attached to a public API endpoint and a database query? That is a ticking Algorithmic Denial of Service (DoS) bomb. +> +> GitGalaxy abandons flat volumetric counting in favor of an **N-Dimensional Physics Equation**. We evaluate the mathematical depth of your logic and multiply it by its "Data Gravity" and network exposure. +> +> **Effect:** Maps directly to the GitGalaxy Universal Risk Spectrum. +> * 🟦 **VERY LOW (Score 0-19):** Linear $O(N)$ execution or safely shielded streams. +> * 🟨 **INTERMEDIATE (Score 40-59):** Moderate. $O(N^2)$ logic that is mostly isolated or well-guarded by safety bailouts. +> * πŸŸ₯ **VERY HIGH (Score 80-100+):** Asymmetrical Threat. Recursive $O(2^N)$ or $O(N^3)$ loops directly wired into unauthenticated APIs or state-mutating database calls. + +## The Philosophy: The Asymmetrical Attack Surface (CTO Pitch) + +Traditional SAST tools fail to catch algorithmic vulnerabilities because they are designed to grep for known CVEs or hardcoded secrets. But an Algorithmic DoS isn't a CVEβ€”it is a structural reality of your architecture. + +A healthy engineering culture admits that human working memory is a finite resource. Similarly, we must acknowledge that CPU threads and RAM are finite biological-equivalent bottlenecks for your infrastructure. If an attacker discovers an unprotected $O(N^4)$ function, they don't need a botnet to take down your servers. A single, well-crafted malicious payload can trap your core processing threads in infinite loops, starving your system of resources and resulting in a catastrophic outage. + +GitGalaxy maps the "syntactic physics" of your codebase to identify where exponential complexity collides with public exposure. By surfacing these asymmetrical attack surfaces deterministically, we empower teams to secure their architecture *before* it reaches production. + +## The Inputs: Variables of the Algorithmic Engine + +We calculate the physical threat of a function by measuring its depth, its environment, and its structural defenses. + +| Variable | Metric Focus | Multiplier | Human Translation | +| :--- | :--- | :--- | :--- | +| `big_o_depth` | Algorithmic Depth | Exponential | Evaluates nesting. $O(N)$ is ignored. $O(N^2)$, $O(N^3)$, and recursive $O(2^N)$ trigger exponential threat scaling. | +| `api` / `io` | Choke Points | Additive | Functions exposed to network requests or I/O act as weaponizable triggers. | +| `db_complexity` | Data Gravity | Additive | Heavy iteration paired with ORM/SQL queries generates severe database locking risks. | +| `flux` | State Mutation | Additive | Mutating variables inside heavy loops quickly leads to Out of Memory (OOM) bombs. | +| `safety` / `bailout` | The Guardrails | 0.5x (Dampener) | Break statements, return limits, and try/catch blocks act as structural circuit breakers, slicing the risk in half. | +| `lazy_evaluation` | OOM Shield | 0.5x (Dampener) | Generators and streams process data in $O(1)$ memory, neutralizing state flux threats. | + +## Universal Framework Integration + +As with all core physics calculations, the DoS engine is deeply integrated with the ecosystem context: +* **Network Popularity (Blast Radius):** An $O(N^3)$ loop in a globally imported "God Node" multiplies the threat across the entire repository. If the function is an orphaned utility with no inbound network edges, the risk is heavily dampened. +* **Agentic / Hardware Shields:** Physics engine mitigations are dynamically applied. If the complex execution is occurring within a closed-loop native hardware bridge or a deterministic ML pipeline, standard Web-DoS math is scaled back. + +## The Mathematics: The Density of Complexity + +We use a Logistic Function (Sigmoid) tuned to be forgiving of moderate complexity but demanding of high complexity. + +**Step A: The Base Threat** +We evaluate the structural depth. Anything below $O(N^2)$ is safely ignored. +$$BaseThreat=BigODepth^2$$ + +**Step B: The Asymmetrical Amplifiers (Data Gravity & Choke Points)** +We calculate the environmental multipliers. A deep loop is only dangerous if it interacts with external state or heavy data limits. +$$ChokeMultiplier=1.0+APIHits+IOHits+FluxHits$$ +$$GravityMultiplier=1.0+(DBComplexity\times0.5)$$ +$$ThreatMass=BaseThreat\times ChokeMultiplier\times GravityMultiplier$$ + +**Step C: Structural Dampeners (The Circuit Breakers)** +If the engine detects safety bailouts (`break`, `return`, `limit`) or lazy evaluation (generators), the mass is structurally mitigated. +$$MitigatedMass=ThreatMass\times0.5$$ + +**Step D: The Sigmoid Clamp** +We normalize the final mass against the physical lines of code to find the structural density, then map it to the GitGalaxy 0-100 spectrum. +$$Density=\left(\frac{MitigatedMass}{\max(LOC+150,\ 1)}\right)\times100.0$$ +$$FinalRisk=\frac{100.0}{1+e^{-0.3\times(Density-15.0)}}$$ + +--- + +## Implementation (Python Reference) + +```python +import math +from typing import Dict, List, Any + +def _calc_algorithmic_dos( + self, + loc: int, + eq: Dict[str, int], + mp: float, + functions: List[Dict[str, Any]], + popularity: int, +) -> float: + if not functions: + return 0.0 + + dos_mass = 0.0 + + for func in functions: + depth = func.get("big_o_depth", 1) + # 1. Ignore O(N) linear loops + if depth < 2: + continue + + # 2. Base Threat (Exponential decay of performance) + func_threat = float(depth**2) + + # 3. Data Gravity & Network Choke Points + db_complex = func.get("db_complexity", 0) + if db_complex > 0: + func_threat *= 1.0 + (db_complex * 0.5) + + hv = func.get("hit_vector", {}) + choke_multiplier = 1.0 + hv.get("api", 0) + hv.get("io", 0) + hv.get("flux", 0) + func_threat *= choke_multiplier + + # 4. Structural Dampeners (Guardrails) + safety_hits = hv.get("safety", 0) + hv.get("bailout_hits", 0) + if safety_hits > 0: + func_threat *= 0.5 # 50% reduction for bounded iteration + + dos_mass += func_threat + + if dos_mass == 0.0: + return 0.0 + + # 5. Network Posture (Blast Radius) + network_multiplier = 1.0 + if popularity == 0 and eq.get("api", 0) == 0: + network_multiplier = 0.10 # Safely isolated orphan + elif popularity > 0: + network_multiplier = min(1.0 + (math.log1p(popularity) / 5.0), 3.0) + + total_threat_mass = dos_mass * network_multiplier + + # 6. The Sigmoid Curve + t = self.risk_tuning.get("algorithmic_dos", {}) + density = (total_threat_mass / max(loc + t.get("loc_padding", 150), 1)) * 100.0 + + threshold = t.get("threshold_base", 15.0) + + try: + score = 100.0 / (1.0 + math.exp(-t.get("sigmoid_slope", 0.3) * (density - threshold))) + except OverflowError: + score = 100.0 if density > threshold else 0.0 + + return min(score * mp, 100.0) +``` + +

+ +--- + +### 🌌 Powered by the blAST Engine + +This documentation is part of the [GitGalaxy Ecosystem](https://github.com/squid-protocol/gitgalaxy), an AST-free, LLM-free heuristic knowledge graph engine. + +* 🧠 **[Deep Dive into the Physics Source Code](https://github.com/squid-protocol/gitgalaxy/tree/main/gitgalaxy/physics)** to see the math in action. +* πŸͺ **[Explore the GitHub Repository](https://github.com/squid-protocol/gitgalaxy)** for code, tools, and updates. +* πŸ”­ **[Visualize your own repository at GitGalaxy.io](https://gitgalaxy.io/)** using our interactive 3D WebGPU dashboard. + +--- + +**[⬅️ Back to Master Index](index.md)** \ No newline at end of file diff --git a/gitgalaxy/galaxyscope.py b/gitgalaxy/galaxyscope.py index 8db4d7d..adebf14 100644 --- a/gitgalaxy/galaxyscope.py +++ b/gitgalaxy/galaxyscope.py @@ -47,6 +47,7 @@ try: import yaml + HAS_PYYAML = True except ImportError: HAS_PYYAML = False diff --git a/gitgalaxy/physics/signal_processor.py b/gitgalaxy/physics/signal_processor.py index 49c9509..dcaa6fb 100644 --- a/gitgalaxy/physics/signal_processor.py +++ b/gitgalaxy/physics/signal_processor.py @@ -222,13 +222,27 @@ def calculate_risk_vector( umbrella_bonus: float = 0.0, ) -> Dict[str, Any]: """Calculates risk exposure, temporal physics, and per-file physical impact.""" - loc = max(meta.get("coding_loc", 1), 1) - total_loc = max(meta.get("total_loc", loc), 1) - doc_lines = meta.get("doc_loc", 0) - lang_id = meta.get("lang_id", "undeterminable") rel_path = meta.get("path", "unknown") + loc = 1 # Safe fallback for the except block try: + try: + loc = max(int(meta.get("coding_loc", 1)), 1) + except (ValueError, TypeError): + loc = 1 + + try: + total_loc = max(int(meta.get("total_loc", loc)), 1) + except (ValueError, TypeError): + total_loc = loc + + try: + doc_lines = int(meta.get("doc_loc", 0)) + except (ValueError, TypeError): + doc_lines = 0 + + lang_id = meta.get("lang_id", "undeterminable") + import os filename = os.path.basename(rel_path).lower() @@ -719,6 +733,9 @@ def calculate_risk_vector( "churn": 0.0, "documentation": doc_score, "civil_war": self._calc_civil_war(equations), + "algorithmic_dos": self._calc_algorithmic_dos( + loc, equations, mp_map.get("algorithmic_dos", 1.0), functions, popularity + ), # ---> BIAXIAL WEAPONIZATION <--- "obscured_payload": self._calc_obscured_payload( loc, @@ -735,6 +752,7 @@ def calculate_risk_vector( global_archetype, global_drift, local_drift, + max_big_o, ), "injection_surface": self._calc_injection_surface( loc, @@ -1833,6 +1851,7 @@ def _calc_logic_bomb( archetype: str, global_drift: float, local_drift: float, + max_big_o: int = 1, ) -> float: """ Calculates Logic Bomb / Sabotage Exposure. @@ -1860,6 +1879,22 @@ def _calc_logic_bomb( # ---> APPLY THE ARCHETYPE CONTEXT <--- sabotage_mass = (trigger * payload) * arch_multiplier + # ---> THE ALGORITHMIC DOS SPIKE (Big-O Vulnerability) <--- + if max_big_o >= 3: + # 1. API/IO Choke Point (User-Controlled N or Network Latency) + attack_surface = eq.get("api", 0) + eq.get("sec_io", 0) + eq.get("io", 0) + dos_mass = attack_surface * (max_big_o**2) * 10.0 + + # 2. State Flux Bomb (Memory Exhaustion) + flux = eq.get("flux", 0) + eq.get("globals", 0) + dos_mass += flux * (max_big_o**2) * 5.0 + + # 3. The Shielding Dampener (Safety Guardrails) + if eq.get("safety", 0) > 0 or eq.get("bailout_hits", 0) > 0: + dos_mass *= 0.25 # 75% reduction if guardrails exist + + sabotage_mass += dos_mass + # ---> THE TAINT SPIKE <--- # If the LHS Slicer confirmed data crossed from I/O to Danger, risk is absolute. taint_confirmed = eq.get("sec_tainted_injection", 0) @@ -1876,6 +1911,9 @@ def _calc_logic_bomb( return 0.0 explicit_threats = eq.get("sec_graveyard", 0) + eq.get("sec_heat_triggers", 0) + if max_big_o >= 3: + explicit_threats += 1 # Preserve DoS Mass from being zeroed out + if explicit_threats == 0 and taint_confirmed == 0 and not getattr(self, "is_paranoid", False): sabotage_mass *= 0.05 @@ -2072,6 +2110,76 @@ def _calc_secrets_risk(self, loc: int, eq: Dict[str, int], mp: float) -> float: return min(score * mp, 100.0) + def _calc_algorithmic_dos( + self, + loc: int, + eq: Dict[str, int], + mp: float, + functions: List[Dict[str, Any]], + popularity: int, + ) -> float: + """ + Calculates Algorithmic DoS Exposure based on Big-O depth, data gravity, and network choke points. + """ + if not functions: + return 0.0 + + dos_mass = 0.0 + + for func in functions: + depth = func.get("big_o_depth", 1) + if depth < 2: + continue + + # 1. The Base Threat (Exponential decay of performance) + func_threat = float(depth**2) + + # 2. The Amplifiers (Network & Data Gravity) + db_complex = func.get("db_complexity", 0) + if db_complex > 0: + func_threat *= 1.0 + (db_complex * 0.5) + + hv = func.get("hit_vector", {}) + api_hits = hv.get("api", 0) + io_hits = hv.get("io", 0) + hv.get("sec_io", 0) + flux_hits = hv.get("flux", 0) + hv.get("globals", 0) + + choke_multiplier = 1.0 + api_hits + io_hits + flux_hits + func_threat *= choke_multiplier + + # 3. The Dampeners (Guardrails) + safety_hits = hv.get("safety", 0) + hv.get("bailout_hits", 0) + hv.get("cleanup", 0) + if safety_hits > 0: + func_threat *= 0.5 # 50% reduction for bounded iteration + + dos_mass += func_threat + + if dos_mass == 0.0: + return 0.0 + + # Apply File-Level Network Dampeners/Amplifiers + network_multiplier = 1.0 + if popularity == 0 and eq.get("api", 0) == 0: + network_multiplier = 0.10 # Safely isolated orphan + elif popularity > 0: + network_multiplier = min(1.0 + (math.log1p(popularity) / 5.0), 3.0) + + total_threat_mass = dos_mass * network_multiplier + + # Fetch tuning parameters + t = self.risk_tuning.get("algorithmic_dos", {}) + density = (total_threat_mass / max(loc + t.get("loc_padding", 150), 1)) * 100.0 + + threshold = t.get("threshold_base", 15.0) + slope = t.get("sigmoid_slope", 0.3) + + try: + score = 100.0 / (1.0 + math.exp(-slope * (density - threshold))) + except OverflowError: + score = 100.0 if density > threshold else 0.0 + + return min(score * mp, 100.0) + # -------------------------------------------------------------------------- # REPORTING UTILITIES # -------------------------------------------------------------------------- @@ -2106,8 +2214,10 @@ def generate_forensic_report(self, parsed_files: List[Dict[str, Any]]) -> Dict[s def get_cumulative_risk(f): rv = f.get("risk_vector", []) + if not isinstance(rv, list): + return 0.0 # Sum all exposures except civil_war - return sum(val for i, val in enumerate(rv) if i != civil_war_idx and i < len(rv)) + return sum(val for i, val in enumerate(rv) if i != civil_war_idx and i < len(rv) and isinstance(val, (int, float))) sorted_by_cumulative = sorted(active_files, key=get_cumulative_risk, reverse=True) @@ -2124,16 +2234,17 @@ def get_cumulative_risk(f): for file_data in active_files: net = file_data.get("telemetry", {}).get("network_metrics", {}) - rv = file_data.get("risk_vector", []) + raw_rv = file_data.get("risk_vector", []) + rv = raw_rv if isinstance(raw_rv, list) else [] p = file_data.get("path", "") btw = net.get("betweenness_score") or 0.0 close = net.get("closeness_score") or 0.0 pr = net.get("normalized_blast_radius") or 0.0 - flux_risk = rv[flux_idx] if flux_idx >= 0 and len(rv) > flux_idx else 0.0 - err_risk = rv[err_idx] if err_idx >= 0 and len(rv) > err_idx else 0.0 - doc_risk = rv[doc_idx] if doc_idx >= 0 and len(rv) > doc_idx else 0.0 + flux_risk = float(rv[flux_idx]) if flux_idx >= 0 and len(rv) > flux_idx and isinstance(rv[flux_idx], (int, float)) else 0.0 + err_risk = float(rv[err_idx]) if err_idx >= 0 and len(rv) > err_idx and isinstance(rv[err_idx], (int, float)) else 0.0 + doc_risk = float(rv[doc_idx]) if doc_idx >= 0 and len(rv) > doc_idx and isinstance(rv[doc_idx], (int, float)) else 0.0 bottlenecks["contagious_mutation"].append( { @@ -2211,6 +2322,7 @@ def _get_locational_multipliers(self, path: str) -> Dict[str, float]: "State Flux Exposure": "flux", "Specification Exposure": "spec", "Churn Exposure": "churn", + "Algorithmic DoS Exposure": "algorithmic_dos", # --- SECURITY LENSES --- "Obscured Payload Exposure": "obscured", "Logic Bomb Exposure": "logic_bomb", diff --git a/gitgalaxy/recorders/llm_recorder.py b/gitgalaxy/recorders/llm_recorder.py index b5fa6d7..a4f4cb4 100644 --- a/gitgalaxy/recorders/llm_recorder.py +++ b/gitgalaxy/recorders/llm_recorder.py @@ -709,6 +709,7 @@ def _build_markdown( "injection_surface", "memory_corruption", "secrets_risk", + "algorithmic_dos", ] vuln_found = False for v_key in vuln_keys: diff --git a/gitgalaxy/standards/analysis_lens.py b/gitgalaxy/standards/analysis_lens.py index 97e9d70..a0caa90 100644 --- a/gitgalaxy/standards/analysis_lens.py +++ b/gitgalaxy/standards/analysis_lens.py @@ -799,6 +799,11 @@ def get_policy(mode="baseline"): "threshold_base": 6.0, # Lowered from 15.0 (6% mutation density is much more realistic) "sigmoid_slope": 0.40, # Increased from 0.20 to stretch the curve }, + "algorithmic_dos": { + "loc_padding": 150, # Standard padding to forgive O(N^2) in tiny scripts + "threshold_base": 15.0, # The density tipping point where risk spikes + "sigmoid_slope": 0.3, # Stretches the curve to reward partial mitigations + }, # ---> DECOUPLED SECURITY EQUATION TUNING <--- "obscured_payload": { "loc_padding": 500, # Raised from 150. Dilutes the density of massive framework files. @@ -1035,6 +1040,7 @@ def get_policy(mode="baseline"): "churn", "documentation", "civil_war", + "algorithmic_dos", # --- THE SECURITY & VULNERABILITY LENSES --- "obscured_payload", "logic_bomb", @@ -1276,6 +1282,7 @@ def get_policy(mode="baseline"): "injection_surface": "Weaponizable Injection Vectors", "memory_corruption": "Weaponizable Memory Operations", "secrets_risk": "Hardcoded Credential Exposure", + "algorithmic_dos": "Algorithmic DoS Exposure", }, "EXPOSURE_LABELS": { "cognitive_load": "Cognitive Load Exposure", @@ -1291,6 +1298,7 @@ def get_policy(mode="baseline"): "churn": "Volatility Exposure", "documentation": "Documentation Exposure", "civil_war": "Civil War Exposure", + "algorithmic_dos": "Algorithmic DoS Exposure", # --- SECURITY LENS UI LABELS (Plain English) --- "obscured_payload": "Obfuscation & Evasion Surface", "logic_bomb": "Exploit Generation Surface", diff --git a/gitgalaxy/standards/gitgalaxy_config.py b/gitgalaxy/standards/gitgalaxy_config.py index 2059585..29d05bd 100644 --- a/gitgalaxy/standards/gitgalaxy_config.py +++ b/gitgalaxy/standards/gitgalaxy_config.py @@ -81,7 +81,10 @@ ".yml", ] -XRAY_BYPASS_PATHS = ["package-lock.json", "yarn.lock", "composer.lock", +XRAY_BYPASS_PATHS = [ + "package-lock.json", + "yarn.lock", + "composer.lock", "gitgalaxy/core/aperture.py", "gitgalaxy/standards/language_standards.py", "gitgalaxy/security/security_lens.py", @@ -91,7 +94,7 @@ "gitgalaxy/tools/terabyte_log_scanning/pii_leak_hunter.py", "gitgalaxy/tools/supply_chain_security/binary_anomaly_detector.py", "gitgalaxy/tools/supply_chain_security/supply_chain_firewall.py", - "site/css/styles.css" + "site/css/styles.css", ] @@ -222,6 +225,7 @@ "docs", "LEGAL", "legal", + "site", }, # --- 2. Extension-Level Solar Shield --- "BLACK_HOLE_EXTENSIONS": { diff --git a/tests/core_engine/test_language_lens.py b/tests/core_engine/test_language_lens.py index 9e596ea..3f0827f 100644 --- a/tests/core_engine/test_language_lens.py +++ b/tests/core_engine/test_language_lens.py @@ -176,8 +176,9 @@ def test_tier_3_spectral_scan(isolated_detector): # ============================================================================== def test_tier_4_deep_space_discovery(isolated_detector): """Proves the engine can blindly identify a file with no extension.""" + import os # Needs > 20 lines to trigger Tier 4. We inject C-style comments and structure. - content = "// C-style comment\n" * 25 + "int main() { return 0; }\n" * 5 + content = f"// C-style comment{os.linesep}" * 25 + f"int main() {{ return 0; }}{os.linesep}" * 5 result = isolated_detector.inspect(file_path="unknown_binary_xyz", content_sample=content) diff --git a/tests/core_engine/test_signal_processor.py b/tests/core_engine/test_signal_processor.py index 40da47a..d3b7164 100644 --- a/tests/core_engine/test_signal_processor.py +++ b/tests/core_engine/test_signal_processor.py @@ -379,3 +379,766 @@ def test_signal_processor_ai_topology(physics_engine): assert "context amnesia" in insights, "Failed to detect missing Agent Memory!" assert "catastrophically across the system" in insights, "Failed to detect high PageRank blast radius!" assert "Cognitive Choke Point" in insights, "Failed to detect high Betweenness!" + + +# ============================================================================== +# TEST 15: ALGORITHMIC DOS EXPOSURE +# ============================================================================== +def test_signal_processor_algorithmic_dos(physics_engine): + """Proves the Big-O risk exposure scales with data gravity and choke points, and is dampened by safety guardrails.""" + + # 1. Isolated Harmless Loop: O(N^3) but no IO/API and 0 popularity. + m_iso, e_iso = create_synthetic_star(physics_engine, "isolated", 100, {"api": 0}) + m_iso["popularity"] = 0 + m_iso["functions"] = [{"name": "safe_loop", "loc": 50, "big_o_depth": 3, "db_complexity": 0, "hit_vector": {}}] + + # 2. API DoS Bomb: O(N^3) + DB Complexity + Exposed to API + m_bomb, e_bomb = create_synthetic_star(physics_engine, "exposed_bomb", 500, {"api": 4}) + m_bomb["popularity"] = 2 + m_bomb["functions"] = [ + { + "name": "dos_bomb", + "loc": 250, + "big_o_depth": 3, + "db_complexity": 2, + "hit_vector": {"api": 4}, + } + ] + + # 3. Guarded DoS Bomb: Same as above but mitigated by safety bailouts + m_guard, e_guard = create_synthetic_star(physics_engine, "guarded_bomb", 500, {"api": 4}) + m_guard["popularity"] = 2 + m_guard["functions"] = [ + { + "name": "safe_bomb", + "loc": 250, + "big_o_depth": 3, + "db_complexity": 2, + "hit_vector": {"api": 4, "safety": 1, "bailout_hits": 2}, + } + ] + + res_iso = physics_engine.calculate_risk_vector(m_iso, e_iso) + res_bomb = physics_engine.calculate_risk_vector(m_bomb, e_bomb) + res_guard = physics_engine.calculate_risk_vector(m_guard, e_guard) + + # Index 13 is the new algorithmic_dos vector + iso_score = res_iso["risk_vector"][13] + bomb_score = res_bomb["risk_vector"][13] + guard_score = res_guard["risk_vector"][13] + + assert iso_score < bomb_score, "Isolated loop should have significantly lower risk than exposed bomb!" + assert guard_score < bomb_score, "Safety guardrails failed to dampen the Algorithmic DoS threat!" + assert bomb_score > 50.0, "API DoS bomb failed to spike the risk exposure!" + + +# ============================================================================== +# TEST 16: WEAPONIZABLE SURFACE EXPOSURES (Security Lenses) +# ============================================================================== +def test_signal_processor_security_lenses(physics_engine): + """Ensures all security lens risk equations return valid floats and properly scale.""" + + # 1. Logic Bomb + m_lb, e_lb = create_synthetic_star(physics_engine, "logic_bomb", 100, { + "branch": 50, "sec_danger": 20, "sec_tainted_injection": 5 + }) + + # 2. Obscured Payload (Requires intent_mass via sec_danger to bypass the 95% false-positive shield) + m_ob, e_ob = create_synthetic_star(physics_engine, "obscured", 100, { + "sec_heat_triggers": 20, "sec_bitwise_hits": 50, "sec_shadow_imports": 5, "sec_danger": 10 + }) + + # 3. Injection Surface + m_inj, e_inj = create_synthetic_star(physics_engine, "injection", 100, { + "sec_io": 30, "sec_danger": 30 + }) + + # 4. Memory Corruption (Requires native memory language like 'c' + malicious intent to bypass the 95% shield) + m_mem, e_mem = create_synthetic_star(physics_engine, "memory", 100, { + "pointers": 50, "memory_alloc": 20, "sec_danger": 10 + }) + m_mem["lang_id"] = "c" + + r_lb = physics_engine.calculate_risk_vector(m_lb, e_lb) + r_ob = physics_engine.calculate_risk_vector(m_ob, e_ob) + r_inj = physics_engine.calculate_risk_vector(m_inj, e_inj) + r_mem = physics_engine.calculate_risk_vector(m_mem, e_mem) + + idx_lb = physics_engine.RISK_SCHEMA.index("logic_bomb") + idx_ob = physics_engine.RISK_SCHEMA.index("obscured_payload") + idx_inj = physics_engine.RISK_SCHEMA.index("injection_surface") + idx_mem = physics_engine.RISK_SCHEMA.index("memory_corruption") + + assert isinstance(r_lb["risk_vector"][idx_lb], float), "Logic bomb must return a float!" + assert r_lb["risk_vector"][idx_lb] > 10.0, "Logic bomb failed to register!" + + assert isinstance(r_ob["risk_vector"][idx_ob], float), "Obscured payload must return a float!" + assert r_ob["risk_vector"][idx_ob] > 10.0, "Obscured payload failed to register!" + + assert isinstance(r_inj["risk_vector"][idx_inj], float), "Injection surface must return a float!" + assert r_inj["risk_vector"][idx_inj] > 10.0, "Injection surface failed to register!" + + assert isinstance(r_mem["risk_vector"][idx_mem], float), "Memory corruption must return a float!" + assert r_mem["risk_vector"][idx_mem] >= 9.0, "Memory corruption failed to register!" + + +# ============================================================================== +# TEST 17: STRUCTURAL METRICS (Graveyard & Spec Match) +# ============================================================================== +def test_signal_processor_structural_metrics(physics_engine): + """Ensures Graveyard and Spec Match exposures calculate correctly.""" + + # Graveyard (High dead code) + m_grave, e_grave = create_synthetic_star(physics_engine, "graveyard", 100, { + "graveyard": 80 + }) + + # Spec Match (0 specs for 10 functions = 100% risk) + m_spec, e_spec = create_synthetic_star(physics_engine, "spec", 100, { + "func_start": 10, "spec_exposure": 0 + }) + + r_grave = physics_engine.calculate_risk_vector(m_grave, e_grave) + r_spec = physics_engine.calculate_risk_vector(m_spec, e_spec) + + idx_grave = physics_engine.RISK_SCHEMA.index("graveyard") + idx_spec = physics_engine.RISK_SCHEMA.index("spec_match") + + assert r_grave["risk_vector"][idx_grave] > 50.0, "Graveyard risk failed to register!" + assert r_spec["risk_vector"][idx_spec] == 100.0, "Spec match risk failed to register maximum exposure on undocumented functions!" + + +# ============================================================================== +# TEST 18: UNACKNOWLEDGED DEBT (Design Slop Amplifier) +# ============================================================================== +def test_signal_processor_design_slop(physics_engine): + """Proves that silent design slop (orphans/duplicates) exponentially spikes Tech Debt.""" + + # 1. Clean Debt: Only explicit TODOs + m_clean, e_clean = create_synthetic_star(physics_engine, "clean_debt", 100, { + "planned_debt": 10 + }) + + # 2. Sloppy Debt: Explicit TODOs + Invisible Slop + m_slop, e_slop = create_synthetic_star(physics_engine, "sloppy_debt", 100, { + "planned_debt": 10, "design_slop_orphans": 5, "design_slop_duplicates": 2 + }) + + r_clean = physics_engine.calculate_risk_vector(m_clean, e_clean) + r_slop = physics_engine.calculate_risk_vector(m_slop, e_slop) + + idx_debt = physics_engine.RISK_SCHEMA.index("tech_debt") + + assert r_slop["risk_vector"][idx_debt] > r_clean["risk_vector"][idx_debt], "Design Slop failed to amplify Tech Debt!" + assert r_slop["risk_vector"][idx_debt] > 50.0, "Severe slop failed to trigger high exposure!" + + +# ============================================================================== +# TEST 19: VERIFICATION THERMODYNAMICS (Skips & Mass Penalty) +# ============================================================================== +def test_signal_processor_verification_thermodynamics(physics_engine): + """Proves skipped tests neutralize assertions, and massive files receive a testing penalty.""" + + # 1. Safe: 50 tests, standard LOC + m_safe, e_safe = create_synthetic_star(physics_engine, "safe_test", 100, { + "test": 50, "test_skip": 0 + }) + + # 2. Bypassed: 50 tests, but 25 skipped (Thermodynamic cancellation) + m_skip, e_skip = create_synthetic_star(physics_engine, "skip_test", 100, { + "test": 50, "test_skip": 25 + }) + + # 3. Massive: 50 tests, but 1000 LOC (Mass Penalty) + m_mass, e_mass = create_synthetic_star(physics_engine, "mass_test", 1000, { + "test": 50, "test_skip": 0 + }) + + r_safe = physics_engine.calculate_risk_vector(m_safe, e_safe) + r_skip = physics_engine.calculate_risk_vector(m_skip, e_skip) + r_mass = physics_engine.calculate_risk_vector(m_mass, e_mass) + + idx_test = physics_engine.RISK_SCHEMA.index("verification") + + # Higher score = Higher Risk Exposure (Worse Verification) + assert r_safe["risk_vector"][idx_test] < r_skip["risk_vector"][idx_test], "Test skips failed to neutralize assertions!" + assert r_safe["risk_vector"][idx_test] < r_mass["risk_vector"][idx_test], "Mass penalty failed to increase testing risk on giant files!" + + +# ============================================================================== +# TEST 20: GOD FUNCTION PENALTY (Cognitive Load Gini) +# ============================================================================== +def test_signal_processor_god_function_gini(physics_engine): + """Proves that concentrating complexity into a single function spikes Cognitive Load.""" + + # Both files have 100 LOC and 20 Branches total. + + # 1. Flat Distribution (4 functions, 5 branches each) -> Low Gini + m_flat, e_flat = create_synthetic_star(physics_engine, "flat_dist", 100, {"branch": 20}) + m_flat["functions"] = [ + {"name": "f1", "branch": 5, "loc": 25}, + {"name": "f2", "branch": 5, "loc": 25}, + {"name": "f3", "branch": 5, "loc": 25}, + {"name": "f4", "branch": 5, "loc": 25}, + ] + + # 2. God Function (1 massive function, 3 empty) -> High Gini + m_god, e_god = create_synthetic_star(physics_engine, "god_func", 100, {"branch": 20}) + m_god["functions"] = [ + {"name": "god", "branch": 20, "loc": 90}, + {"name": "f2", "branch": 0, "loc": 3}, + {"name": "f3", "branch": 0, "loc": 3}, + {"name": "f4", "branch": 0, "loc": 4}, + ] + + r_flat = physics_engine.calculate_risk_vector(m_flat, e_flat) + r_god = physics_engine.calculate_risk_vector(m_god, e_god) + + idx_cog = physics_engine.RISK_SCHEMA.index("cognitive_load") + + assert r_god["risk_vector"][idx_cog] > r_flat["risk_vector"][idx_cog], "God function Gini index failed to amplify Cognitive Load!" + + +# ============================================================================== +# TEST 21: CONCURRENCY THERMODYNAMICS (Locks & Starvation) +# ============================================================================== +def test_signal_processor_concurrency_thermodynamics(physics_engine): + """Proves sync locks mitigate async risk, and high Big-O spikes thread starvation.""" + + # 1. High Async, No Locks + m_async, e_async = create_synthetic_star(physics_engine, "pure_async", 100, {"concurrency": 20}) + + # 2. High Async, Mitigated by Locks (1 lock mitigates 1.5 async hits) + m_sync, e_sync = create_synthetic_star(physics_engine, "locked_async", 100, { + "concurrency": 20, "sync_locks": 15 + }) + + # 3. Thread Starvation (Async + High Big-O) + m_starve, e_starve = create_synthetic_star(physics_engine, "starved_async", 100, {"concurrency": 20}) + m_starve["functions"] = [{"name": "heavy_thread", "loc": 50, "big_o_depth": 3, "hit_vector": {"concurrency": 5}}] + + r_async = physics_engine.calculate_risk_vector(m_async, e_async) + r_sync = physics_engine.calculate_risk_vector(m_sync, e_sync) + r_starve = physics_engine.calculate_risk_vector(m_starve, e_starve) + + idx_async = physics_engine.RISK_SCHEMA.index("concurrency") + + assert r_sync["risk_vector"][idx_async] < r_async["risk_vector"][idx_async], "Sync locks failed to mitigate concurrency risk!" + assert r_starve["risk_vector"][idx_async] > r_async["risk_vector"][idx_async], "Thread starvation (Big-O + Async) failed to amplify risk!" + + +# ============================================================================== +# TEST 22: THE ECHO CHAMBER FIX (API Isolation) +# ============================================================================== +def test_signal_processor_api_echo_chamber(physics_engine): + """Proves that APIs with no inbound network connections receive a massive risk dampener.""" + + # 1. Orphaned API (Exposes 50 APIs, but 0 popularity) + m_orphan, e_orphan = create_synthetic_star(physics_engine, "orphan_api", 100, {"api": 50}) + m_orphan["popularity"] = 0 + + # 2. Networked API (Exposes 50 APIs, highly popular) + m_network, e_network = create_synthetic_star(physics_engine, "network_api", 100, {"api": 50}) + m_network["popularity"] = 20 + + r_orphan = physics_engine.calculate_risk_vector(m_orphan, e_orphan) + r_network = physics_engine.calculate_risk_vector(m_network, e_network) + + idx_api = physics_engine.RISK_SCHEMA.index("api_exposure") + + assert r_orphan["risk_vector"][idx_api] < (r_network["risk_vector"][idx_api] * 0.5), "Echo chamber fix failed: Orphaned APIs were not properly dampened!" + + +# ============================================================================== +# TEST 23: STATE FLUX THERMODYNAMICS (Immutability) +# ============================================================================== +def test_signal_processor_flux_immutability(physics_engine): + """Proves that immutable data declarations (freeze_hits) neutralize state flux.""" + + # 1. Pure Flux (High mutation) + m_flux, e_flux = create_synthetic_star(physics_engine, "high_flux", 100, {"flux": 30}) + + # 2. Frozen Flux (High mutation, but heavily mitigated by freeze/const/final) + m_frozen, e_frozen = create_synthetic_star(physics_engine, "frozen_flux", 100, { + "flux": 30, "freeze_hits": 40 + }) + + r_flux = physics_engine.calculate_risk_vector(m_flux, e_flux) + r_frozen = physics_engine.calculate_risk_vector(m_frozen, e_frozen) + + idx_flux = physics_engine.RISK_SCHEMA.index("state_flux") + + assert r_frozen["risk_vector"][idx_flux] < r_flux["risk_vector"][idx_flux], "Immutability (freeze_hits) failed to mitigate state flux risk!" + +# ============================================================================== +# TEST 24: EXTENSION DECEPTION SENSOR +# ============================================================================== +def test_signal_processor_extension_deception(physics_engine): + """Proves the engine flags files that claim to be inert data but contain executable logic.""" + m_dec, e_dec = create_synthetic_star(physics_engine, "data", 100) + m_dec["path"] = "src/data.json" # Claims to be JSON + m_dec["lang_id"] = "python" # Actually evaluated as Python! + + r_dec = physics_engine.calculate_risk_vector(m_dec, e_dec) + + idx_mismatch = physics_engine.SIGNAL_SCHEMA.index("sec_extension_mismatch") + assert r_dec["hit_vector"][idx_mismatch] == 1, "Extension Deception Sensor failed to flag the mismatch!" + + +# ============================================================================== +# TEST 25: ALIEN ENTITY CONTEXT PENALTIES +# ============================================================================== +def test_signal_processor_alien_entity(physics_engine): + """Proves that a Systems language hiding in a Web folder receives severe threat multipliers.""" + # 1. Native C (C code inside a C/CPP folder) + m_native, e_native = create_synthetic_star(physics_engine, "native", 100, { + "branch": 50, "sec_danger": 20, "sec_tainted_injection": 5 + }) + m_native["lang_id"] = "c" + m_native["metadata"] = {"folder_dominant_lang": "cpp"} + + # 2. Alien C (C code inside a Javascript/Web folder) + m_alien, e_alien = create_synthetic_star(physics_engine, "alien", 100, { + "branch": 50, "sec_danger": 20, "sec_tainted_injection": 5 + }) + m_alien["lang_id"] = "c" + m_alien["metadata"] = {"folder_dominant_lang": "javascript"} + + r_native = physics_engine.calculate_risk_vector(m_native, e_native) + r_alien = physics_engine.calculate_risk_vector(m_alien, e_alien) + + idx_lb = physics_engine.RISK_SCHEMA.index("logic_bomb") + + assert r_alien["risk_vector"][idx_lb] > r_native["risk_vector"][idx_lb], "Alien entity penalty failed to apply!" + + +# ============================================================================== +# TEST 26: THE AGENTIC & SCIENCE SHIELD +# ============================================================================== +def test_signal_processor_science_shield(physics_engine): + """Proves that Scientific/Math logic dampens the false-positive threat of Logic Bombs.""" + # 1. Standard executable with dangerous triggers + m_std, e_std = create_synthetic_star(physics_engine, "standard", 100, {"branch": 30, "sec_danger": 20}) + + # 2. Scientific executable with the exact same triggers + m_sci, e_sci = create_synthetic_star(physics_engine, "science", 100, {"branch": 30, "sec_danger": 20, "scientific": 10}) + + r_std = physics_engine.calculate_risk_vector(m_std, e_std) + r_sci = physics_engine.calculate_risk_vector(m_sci, e_sci) + + idx_lb = physics_engine.RISK_SCHEMA.index("logic_bomb") + + assert r_sci["risk_vector"][idx_lb] < r_std["risk_vector"][idx_lb], "Scientific shield failed to dampen the Logic Bomb false positive!" + + +# ============================================================================== +# TEST 27: CATASTROPHIC FALLBACKS & EMPTY GALAXIES +# ============================================================================== +def test_signal_processor_catastrophic_fallbacks(physics_engine): + """Ensures the physics engine survives catastrophic type errors and empty data sets.""" + # 1. Force a catastrophic math crash (string instead of int) + m_crash, e_crash = create_synthetic_star(physics_engine, "crash", 100) + m_crash["coding_loc"] = "THIS_WILL_BREAK_MATH" + + r_crash = physics_engine.calculate_risk_vector(m_crash, e_crash) + + assert "error" in r_crash["telemetry"], "Engine failed to catch and log the catastrophic physics failure!" + assert r_crash["risk_vector"] == [0.0] * len(physics_engine.RISK_SCHEMA), "Crash fallback did not safely zero out the risk vector!" + + # 2. Force an empty global synthesis + empty_summary = physics_engine.summarize_galaxy_metrics([], []) + assert empty_summary == {}, "Summarizer failed to safely exit on an empty repository!" + +# ============================================================================== +# TEST 28: CIVIL WAR VOID STATE (Zero Indentation) +# ============================================================================== +def test_signal_processor_civil_war_void(physics_engine): + """Proves the Civil War exposure safely defaults to 50.0 (Neutral) if a file has no indentation.""" + m_void, e_void = create_synthetic_star(physics_engine, "void_file", 10, { + "indent_tabs": 0, "indent_spaces": 0 + }) + + r_void = physics_engine.calculate_risk_vector(m_void, e_void) + idx_civil = physics_engine.RISK_SCHEMA.index("civil_war") + + assert r_void["risk_vector"][idx_civil] == 50.0, "Void state failed to default to 50.0% neutral exposure!" + + +# ============================================================================== +# TEST 29: AGENTIC RCE (Prompt Injection to Execution) +# ============================================================================== +def test_signal_processor_agentic_rce(physics_engine): + """Proves that pairing an LLM Orchestrator with dynamic execution creates a massive Injection Surface spike.""" + # 1. Standard dynamic execution + m_std, e_std = create_synthetic_star(physics_engine, "std_exec", 100, {"sec_danger": 10}) + + # 2. Agentic dynamic execution + m_agent, e_agent = create_synthetic_star(physics_engine, "agent_exec", 100, { + "sec_danger": 10, "llm_orchestrator": 5, "ai_tools": 5 + }) + + r_std = physics_engine.calculate_risk_vector(m_std, e_std) + r_agent = physics_engine.calculate_risk_vector(m_agent, e_agent) + + idx_inj = physics_engine.RISK_SCHEMA.index("injection_surface") + + assert r_agent["risk_vector"][idx_inj] > r_std["risk_vector"][idx_inj], "Agentic RCE spike failed to amplify injection risk!" + + +# ============================================================================== +# TEST 30: CRYPTOGRAPHY & PROFESSIONALISM SHIELDS +# ============================================================================== +def test_signal_processor_crypto_professionalism_shield(physics_engine): + """Proves that heavy documentation, safety blocks, and crypto math dampen obfuscation false positives.""" + # 1. Raw obfuscation (High entropy, bitwise math) + malicious intent + m_raw, e_raw = create_synthetic_star(physics_engine, "raw_obf", 100, { + "sec_heat_triggers": 50, "sec_bitwise_hits": 50, "sec_danger": 10 + }) + + # 2. Professional cryptography (Same obfuscation, but heavily documented and safe) + m_pro, e_pro = create_synthetic_star(physics_engine, "pro_crypto", 100, { + "sec_heat_triggers": 50, "sec_bitwise_hits": 50, "sec_danger": 10, + "doc": 100, "safety": 20, "cryptography": 10 + }) + + r_raw = physics_engine.calculate_risk_vector(m_raw, e_raw) + r_pro = physics_engine.calculate_risk_vector(m_pro, e_pro) + + idx_ob = physics_engine.RISK_SCHEMA.index("obscured_payload") + + assert r_pro["risk_vector"][idx_ob] < r_raw["risk_vector"][idx_ob], "Crypto/Professionalism shield failed to dampen obfuscation risk!" + + +# ============================================================================== +# TEST 31: LLM API SECRETS LEAK +# ============================================================================== +def test_signal_processor_llm_api_secrets(physics_engine): + """Proves that hardcoded secrets mixed with LLM APIs trigger a massive careless amplifier.""" + # 1. Standard secret leak (Requires sec_heat_triggers to bypass the 2.0 clamp) + _unused_m_std, e_std = create_synthetic_star(physics_engine, "std_leak", 500, { + "sec_private_info": 1, "globals": 1, "sec_heat_triggers": 1 + }) + + # 2. Careless LLM API secret leak (Calling APIs without using global variables) + m_llm, _unused_e_llm = create_synthetic_star(physics_engine, "llm_leak", 500, { + "sec_private_info": 1, "llm_api": 5, "globals": 0, "sec_heat_triggers": 1 + }) + +# ============================================================================== +# TEST 32: SAFE MINIFIED VENDOR FILE +# ============================================================================== +def test_signal_processor_safe_minified(physics_engine): + """Proves that minified files with zero malicious intent safely bypass the tripwire.""" + m_safe, e_safe = create_synthetic_star(physics_engine, "jquery_min", 100, {"branch": 50, "flux": 20}) + m_safe["is_minified"] = True + + r_safe = physics_engine.calculate_risk_vector(m_safe, e_safe) + + assert r_safe["risk_vector"] == [0.0] * len(physics_engine.RISK_SCHEMA), "Safe minified file failed to zero out risks!" + assert r_safe["telemetry"]["domain_context"]["alert"] == "MINIFIED VENDOR BYPASS", "Minified bypass flag missing!" + + +# ============================================================================== +# TEST 33: LAZY EVALUATION SHIELD (OOM BOMB) +# ============================================================================== +def test_signal_processor_lazy_evaluation_shield(physics_engine): + """Proves that lazy evaluation (generators/streams) neutralizes the OOM Bomb multiplier.""" + # 1. Ticking OOM Bomb (O(N^3) + High Flux + No Lazy Eval) + m_oom, e_oom = create_synthetic_star(physics_engine, "oom_bomb", 100, {"flux": 20}) + m_oom["functions"] = [{"name": "heavy_loop", "loc": 50, "big_o_depth": 3}] + + # 2. Safe Stream (O(N^3) + High Flux + Lazy Evaluation) + m_lazy, e_lazy = create_synthetic_star(physics_engine, "lazy_stream", 100, {"flux": 20, "lazy_evaluation": 10}) + m_lazy["functions"] = [{"name": "generator", "loc": 50, "big_o_depth": 3}] + + r_oom = physics_engine.calculate_risk_vector(m_oom, e_oom) + r_lazy = physics_engine.calculate_risk_vector(m_lazy, e_lazy) + + idx_flux = physics_engine.RISK_SCHEMA.index("state_flux") + assert r_lazy["risk_vector"][idx_flux] < r_oom["risk_vector"][idx_flux], "Lazy evaluation failed to dampen the OOM Bomb multiplier!" + + +# ============================================================================== +# TEST 34: AI TOPOLOGY (DEEP LEARNING & TRADITIONAL ML) +# ============================================================================== +def test_signal_processor_ai_topology_dl_ml(physics_engine): + """Ensures the AI topology summarizer correctly identifies Deep Learning and Traditional ML.""" + # Deep Learning + m_dl, e_dl = create_synthetic_star(physics_engine, "pytorch_model", 100, {"dl_frameworks": 10}) + r_dl = physics_engine.calculate_risk_vector(m_dl, e_dl) + m_dl.update(r_dl) + + # Traditional ML + m_ml, e_ml = create_synthetic_star(physics_engine, "xgboost_model", 100, {"ml_traditional": 10}) + r_ml = physics_engine.calculate_risk_vector(m_ml, e_ml) + m_ml.update(r_ml) + + # Summarize DL + sum_dl = physics_engine.summarize_galaxy_metrics([m_dl], []) + assert sum_dl["ai_topology"]["classification"] == "Deep Learning Architecture", "Failed to classify DL Architecture!" + + # Summarize ML + sum_ml = physics_engine.summarize_galaxy_metrics([m_ml], []) + assert sum_ml["ai_topology"]["classification"] == "Statistical Machine Learning", "Failed to classify Traditional ML!" + +# ============================================================================== +# TEST 35: PARANOID MODE ACTIVATION +# ============================================================================== +def test_signal_processor_paranoid_mode(physics_engine): + """Proves that Paranoid Mode tightens the Sigmoid thresholds across security lenses.""" + m_para, e_para = create_synthetic_star(physics_engine, "paranoid_file", 500, { + "sec_danger": 5, "sec_io": 5 + }) + + # Calculate in Standard Mode + physics_engine.is_paranoid = False + r_std = physics_engine.calculate_risk_vector(m_para, e_para) + + # Calculate in Paranoid Mode + physics_engine.is_paranoid = True + r_para = physics_engine.calculate_risk_vector(m_para, e_para) + + # Reset the engine state so subsequent tests aren't affected + physics_engine.is_paranoid = False + + idx_inj = physics_engine.RISK_SCHEMA.index("injection_surface") + assert r_para["risk_vector"][idx_inj] > r_std["risk_vector"][idx_inj], "Paranoid mode failed to amplify the risk exposure!" + + +# ============================================================================== +# TEST 36: AI TOPOLOGY (RAG & CLOUD WRAPPERS) +# ============================================================================== +def test_signal_processor_ai_topology_rag_cloud(physics_engine): + """Ensures the AI topology summarizer correctly identifies RAG pipelines and Cloud wrappers.""" + # RAG Pipeline + m_rag, e_rag = create_synthetic_star(physics_engine, "rag_bot", 100, { + "llm_vector_store": 10, "llm_api": 5 + }) + r_rag = physics_engine.calculate_risk_vector(m_rag, e_rag) + m_rag.update(r_rag) + + # Cloud API Wrapper + m_cloud, e_cloud = create_synthetic_star(physics_engine, "cloud_bot", 100, { + "llm_api": 10 + }) + r_cloud = physics_engine.calculate_risk_vector(m_cloud, e_cloud) + m_cloud.update(r_cloud) + + # Summarize RAG + sum_rag = physics_engine.summarize_galaxy_metrics([m_rag], []) + assert sum_rag["ai_topology"]["classification"] == "RAG Pipeline (Retrieval-Augmented Generation)", "Failed to classify RAG Pipeline!" + + # Summarize Cloud + sum_cloud = physics_engine.summarize_galaxy_metrics([m_cloud], []) + assert sum_cloud["ai_topology"]["classification"] == "Cloud API Wrapper", "Failed to classify Cloud API Wrapper!" + + +# ============================================================================== +# TEST 37: SIGMOID OVERFLOW RESISTANCE (Extreme Density) +# ============================================================================== +def test_signal_processor_sigmoid_overflow(physics_engine): + """Proves the Sigmoid curve safely catches math.exp OverflowErrors on extreme densities.""" + # Create a file with mathematically impossible levels of safety to force a massive negative density + m_safe, e_safe = create_synthetic_star(physics_engine, "super_shield", 1, { + "safety": 15000, "test": 15000, "doc": 15000, "freeze_hits": 15000 + }) + + # Create a file with mathematically impossible danger to force a massive positive density + m_danger, e_danger = create_synthetic_star(physics_engine, "super_bomb", 1, { + "branch": 15000, "concurrency": 15000, "flux": 15000, "sec_danger": 15000 + }) + + # If these execute without crashing the test runner, the except blocks are working perfectly. + r_safe = physics_engine.calculate_risk_vector(m_safe, e_safe) + r_danger = physics_engine.calculate_risk_vector(m_danger, e_danger) + + idx_saf = physics_engine.RISK_SCHEMA.index("safety_score") + + # The OverflowError should gracefully return either 0.0 or 100.0 depending on the threat trajectory + assert r_safe["risk_vector"][idx_saf] == 0.0, "Overflow fallback failed to zero out the mathematically safe file!" + assert r_danger["risk_vector"][idx_saf] == 100.0, "Overflow fallback failed to max out the mathematically dangerous file!" + + +# ============================================================================== +# TEST 38: STANDALONE INIT & SILO VOID +# ============================================================================== +def test_signal_processor_standalone_init_and_silo(): + """Ensures the processor initializes without a parent logger and handles 0-commit silo math.""" + from gitgalaxy.physics.signal_processor import SignalProcessor + + # Test standalone initialization + standalone_engine = SignalProcessor(parent_logger=None) + assert standalone_engine is not None, "SignalProcessor failed to initialize without a parent logger!" + + # Test the silo math directly on a 0-commit developer void state + zero_silo = standalone_engine._calculate_silo_risk({"dev_a": 0, "dev_b": 0}) + assert zero_silo == 0.0, "Silo risk failed to safely return 0.0 on a void state!" + +# ============================================================================== +# TEST 39: THE LOAD-BEARER PENALTY (Verification Risk) +# ============================================================================== +def test_signal_processor_load_bearer_penalty(physics_engine): + """Proves that highly imported files receive a massive penalty for lacking tests.""" + # 1. Standard file with 0 tests + m_std, e_std = create_synthetic_star(physics_engine, "std_untested", 100, {"test": 0}) + m_std["popularity"] = 0 + + # 2. Foundational pillar with 0 tests + m_pillar, e_pillar = create_synthetic_star(physics_engine, "pillar_untested", 100, {"test": 0}) + m_pillar["popularity"] = 20 # Highly imported + + r_std = physics_engine.calculate_risk_vector(m_std, e_std) + r_pillar = physics_engine.calculate_risk_vector(m_pillar, e_pillar) + + idx_ver = physics_engine.RISK_SCHEMA.index("verification") + + assert r_pillar["risk_vector"][idx_ver] > r_std["risk_vector"][idx_ver], "Load-bearer penalty failed to amplify verification risk!" + + +# ============================================================================== +# TEST 40: KINETIC BLINDNESS (Documentation Risk) +# ============================================================================== +def test_signal_processor_kinetic_blindness(physics_engine): + """Proves that deeply nested/heavy functions lacking docstrings spike documentation risk.""" + # 1. Complex function WITH a docstring + m_doc, e_doc = create_synthetic_star(physics_engine, "documented_heavy", 100, {"doc": 10}) + m_doc["functions"] = [{"name": "heavy_func", "loc": 50, "big_o_depth": 3, "docstring": True}] + + # 2. Complex function WITHOUT a docstring + m_blind, e_blind = create_synthetic_star(physics_engine, "blind_heavy", 100, {"doc": 10}) + m_blind["functions"] = [{"name": "heavy_func", "loc": 50, "big_o_depth": 3, "docstring": False}] + + r_doc = physics_engine.calculate_risk_vector(m_doc, e_doc) + r_blind = physics_engine.calculate_risk_vector(m_blind, e_blind) + + idx_doc = physics_engine.RISK_SCHEMA.index("documentation") + + assert r_blind["risk_vector"][idx_doc] > r_doc["risk_vector"][idx_doc], "Kinetic blindness failed to penalize undocumented heavy functions!" + + +# ============================================================================== +# TEST 41: TECH DEBT SLOP MULTIPLIER +# ============================================================================== +def test_signal_processor_tech_debt_slop(physics_engine): + """Proves that unacknowledged slop multiplies the severity of fragile debt.""" + # 1. Just fragile debt + m_debt, e_debt = create_synthetic_star(physics_engine, "fragile_only", 500, {"fragile_debt": 2}) + + # 2. Fragile debt PLUS orphans/duplicates + m_slop, e_slop = create_synthetic_star(physics_engine, "fragile_slop", 500, { + "fragile_debt": 2, "design_slop_orphans": 2, "design_slop_duplicates": 1 + }) + + r_debt = physics_engine.calculate_risk_vector(m_debt, e_debt) + r_slop = physics_engine.calculate_risk_vector(m_slop, e_slop) + + idx_debt = physics_engine.RISK_SCHEMA.index("tech_debt") + + # The multiplier is 1.5x, so the slop score should be significantly higher + assert r_slop["risk_vector"][idx_debt] > (r_debt["risk_vector"][idx_debt] * 1.2), "Tech debt slop failed to multiply fragile debt severity!" + + +# ============================================================================== +# TEST 42: REPORT GENERATOR MALFORMED DICTIONARY FALLBACK +# ============================================================================== +def test_signal_processor_report_fallback(physics_engine): + """Ensures the report generator safely handles missing keys and malformed telemetry.""" + malformed_files = [ + {"name": "missing_risk_vector", "path": "src/bad1.py"}, # No risk_vector key + {"name": "string_risk_vector", "path": "src/bad2.py", "risk_vector": "INVALID"}, # Wrong type + {"name": "short_risk_vector", "path": "src/bad3.py", "risk_vector": [0.0]} # Index out of bounds + ] + + # Should execute smoothly without raising a KeyError, TypeError, or IndexError + report = physics_engine.generate_forensic_report(malformed_files) + + assert "exposures" in report, "Report generator completely failed on malformed data!" + + # The lowest/highest rankings should have safely defaulted the values to 0.0 + for exposure_key, ranking in report["exposures"].items(): + assert ranking["highest"][0]["value"] == 0.0, f"Fallback failed to zero out invalid data for {exposure_key}!" + +# ============================================================================== +# TEST 43: CRITICAL LEAK BYPASS (Absolute Maximum Risk) +# ============================================================================== +def test_signal_processor_critical_leak_bypass(physics_engine): + """Proves that critical leaks bypass standard physics and max out secrets risk.""" + m_leak, e_leak = create_synthetic_star(physics_engine, "aws_key", 10, {}) + m_leak["path"] = "config/production.pem" + m_leak["metadata"] = {"aperture_reason": "CRITICAL LEAK DETECTED"} + + r_leak = physics_engine.calculate_risk_vector(m_leak, e_leak) + + idx_sec = physics_engine.RISK_SCHEMA.index("secrets_risk") + + assert r_leak["file_impact"] == 150.0, "Critical leak failed to trigger the 150.0 mass spike!" + assert r_leak["risk_vector"][idx_sec] == 100.0, "Critical leak failed to max out secrets risk!" + assert r_leak["telemetry"]["domain_context"]["alert"] == "CRITICAL LEAK BYPASS", "Bypass alert missing from telemetry!" + + +# ============================================================================== +# TEST 44: THE DARKNESS RATIO (100% Unparsable) +# ============================================================================== +def test_signal_processor_darkness_ratio(physics_engine): + """Ensures global synthesis survives a completely broken repository (0 parsed, 10 unparsable).""" + unparsable_files = [{"name": f"broken_{i}.py"} for i in range(10)] + + # 0 parsed files, 10 unparsable files + summary = physics_engine.summarize_galaxy_metrics([], unparsable_files) + + assert summary["summary"]["total_files"] == 10, "Failed to count unparsable files in total!" + assert summary["summary"]["verified_files"] == 0, "Verified files should be 0!" + assert summary["summary"]["Percent_Visible"] == 0.0, "Darkness ratio failed to calculate 0% visibility!" + assert summary["unparsable_files"]["ambig_file_count"] == 10, "Failed to aggregate unparsable file count!" + + +# ============================================================================== +# TEST 45: HARDWARE BRIDGE DAMPENERS +# ============================================================================== +def test_signal_processor_hardware_bridge_shield(physics_engine): + """Proves that Hardware Bridges (Serial/USB I/O) are forgiven for dynamic execution.""" + # 1. Raw Execution (Malicious) + m_raw, e_raw = create_synthetic_star(physics_engine, "raw_exec", 100, {"sec_danger": 10, "sec_io": 10}) + + # 2. Hardware Execution (Expected Arduino/Serial behavior) + m_hw, e_hw = create_synthetic_star(physics_engine, "hw_exec", 100, { + "sec_danger": 10, "sec_io": 10, "hardware_bridge": 10 + }) + + r_raw = physics_engine.calculate_risk_vector(m_raw, e_raw) + r_hw = physics_engine.calculate_risk_vector(m_hw, e_hw) + + idx_inj = physics_engine.RISK_SCHEMA.index("injection_surface") + + assert r_hw["risk_vector"][idx_inj] < r_raw["risk_vector"][idx_inj], "Hardware bridge shield failed to dampen injection risk!" + + +# ============================================================================== +# TEST 46: ALGORITHMIC DOS O(N) BYPASS +# ============================================================================== +def test_signal_processor_algorithmic_dos_linear_bypass(physics_engine): + """Ensures O(N) linear loops are ignored by the Algorithmic DoS equations.""" + m_linear, e_linear = create_synthetic_star(physics_engine, "linear_loop", 100, {"api": 10}) + # big_o_depth = 1 is standard O(N) + m_linear["functions"] = [{"name": "safe_loop", "loc": 50, "big_o_depth": 1, "db_complexity": 5}] + + r_linear = physics_engine.calculate_risk_vector(m_linear, e_linear) + idx_dos = physics_engine.RISK_SCHEMA.index("algorithmic_dos") + + # Because depth is < 2, the loop `continue` triggers and mass remains 0.0 + assert r_linear["risk_vector"][idx_dos] == 0.0, "O(N) linear loops should not trigger Algorithmic DoS!" + + +# ============================================================================== +# TEST 47: TIER 3 LANGUAGE FALLBACK +# ============================================================================== +def test_signal_processor_tier_3_language(physics_engine): + """Ensures esoteric/unstructured languages trigger Tier 3 physics modifiers.""" + m_t3, e_t3 = create_synthetic_star(physics_engine, "esoteric", 100, {"branch": 20}) + # "haskell" is not in the Tier 1 or Tier 2 explicit sets + m_t3["lang_id"] = "haskell" + + r_t3 = physics_engine.calculate_risk_vector(m_t3, e_t3) + + # If it didn't crash, the _get_tier fallback successfully returned "tier3" and pulled the correct physics vars + assert r_t3 is not None, "Tier 3 language fallback crashed the physics engine!" \ No newline at end of file