Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions docs/wiki/08-24-Big-O-Detection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Algorithmic DoS & Big-O Detection

> **Metric: Algorithmic DoS & Data Gravity**
>
> **TL;DR:** Performance is no longer just a latency issue; it is a critical security perimeter. A standalone $O(N^3)$ loop calculating math offline is harmless. But that exact same $O(N^3)$ loop attached to a public API endpoint and a database query? That is a ticking Algorithmic Denial of Service (DoS) bomb.
>
> GitGalaxy abandons flat volumetric counting in favor of an **N-Dimensional Physics Equation**. We evaluate the mathematical depth of your logic and multiply it by its "Data Gravity" and network exposure.
>
> **Effect:** Maps directly to the GitGalaxy Universal Risk Spectrum.
> * 🟦 **VERY LOW (Score 0-19):** Linear $O(N)$ execution or safely shielded streams.
> * 🟨 **INTERMEDIATE (Score 40-59):** Moderate. $O(N^2)$ logic that is mostly isolated or well-guarded by safety bailouts.
> * 🟥 **VERY HIGH (Score 80-100+):** Asymmetrical Threat. Recursive $O(2^N)$ or $O(N^3)$ loops directly wired into unauthenticated APIs or state-mutating database calls.

## The Philosophy: The Asymmetrical Attack Surface (CTO Pitch)

Traditional SAST tools fail to catch algorithmic vulnerabilities because they are designed to grep for known CVEs or hardcoded secrets. But an Algorithmic DoS isn't a CVE—it is a structural reality of your architecture.

A healthy engineering culture admits that human working memory is a finite resource. Similarly, we must acknowledge that CPU threads and RAM are finite biological-equivalent bottlenecks for your infrastructure. If an attacker discovers an unprotected $O(N^4)$ function, they don't need a botnet to take down your servers. A single, well-crafted malicious payload can trap your core processing threads in infinite loops, starving your system of resources and resulting in a catastrophic outage.

GitGalaxy maps the "syntactic physics" of your codebase to identify where exponential complexity collides with public exposure. By surfacing these asymmetrical attack surfaces deterministically, we empower teams to secure their architecture *before* it reaches production.

## The Inputs: Variables of the Algorithmic Engine

We calculate the physical threat of a function by measuring its depth, its environment, and its structural defenses.

| Variable | Metric Focus | Multiplier | Human Translation |
| :--- | :--- | :--- | :--- |
| `big_o_depth` | Algorithmic Depth | Exponential | Evaluates nesting. $O(N)$ is ignored. $O(N^2)$, $O(N^3)$, and recursive $O(2^N)$ trigger exponential threat scaling. |
| `api` / `io` | Choke Points | Additive | Functions exposed to network requests or I/O act as weaponizable triggers. |
| `db_complexity` | Data Gravity | Additive | Heavy iteration paired with ORM/SQL queries generates severe database locking risks. |
| `flux` | State Mutation | Additive | Mutating variables inside heavy loops quickly leads to Out of Memory (OOM) bombs. |
| `safety` / `bailout` | The Guardrails | 0.5x (Dampener) | Break statements, return limits, and try/catch blocks act as structural circuit breakers, slicing the risk in half. |
| `lazy_evaluation` | OOM Shield | 0.5x (Dampener) | Generators and streams process data in $O(1)$ memory, neutralizing state flux threats. |

## Universal Framework Integration

As with all core physics calculations, the DoS engine is deeply integrated with the ecosystem context:
* **Network Popularity (Blast Radius):** An $O(N^3)$ loop in a globally imported "God Node" multiplies the threat across the entire repository. If the function is an orphaned utility with no inbound network edges, the risk is heavily dampened.
* **Agentic / Hardware Shields:** Physics engine mitigations are dynamically applied. If the complex execution is occurring within a closed-loop native hardware bridge or a deterministic ML pipeline, standard Web-DoS math is scaled back.

## The Mathematics: The Density of Complexity

We use a Logistic Function (Sigmoid) tuned to be forgiving of moderate complexity but demanding of high complexity.

**Step A: The Base Threat**
We evaluate the structural depth. Anything below $O(N^2)$ is safely ignored.
$$BaseThreat=BigODepth^2$$

**Step B: The Asymmetrical Amplifiers (Data Gravity & Choke Points)**
We calculate the environmental multipliers. A deep loop is only dangerous if it interacts with external state or heavy data limits.
$$ChokeMultiplier=1.0+APIHits+IOHits+FluxHits$$
$$GravityMultiplier=1.0+(DBComplexity\times0.5)$$
$$ThreatMass=BaseThreat\times ChokeMultiplier\times GravityMultiplier$$

**Step C: Structural Dampeners (The Circuit Breakers)**
If the engine detects safety bailouts (`break`, `return`, `limit`) or lazy evaluation (generators), the mass is structurally mitigated.
$$MitigatedMass=ThreatMass\times0.5$$

**Step D: The Sigmoid Clamp**
We normalize the final mass against the physical lines of code to find the structural density, then map it to the GitGalaxy 0-100 spectrum.
$$Density=\left(\frac{MitigatedMass}{\max(LOC+150,\ 1)}\right)\times100.0$$
$$FinalRisk=\frac{100.0}{1+e^{-0.3\times(Density-15.0)}}$$

---

## Implementation (Python Reference)

```python
import math
from typing import Dict, List, Any

def _calc_algorithmic_dos(
self,
loc: int,
eq: Dict[str, int],
mp: float,
functions: List[Dict[str, Any]],
popularity: int,
) -> float:
if not functions:
return 0.0

dos_mass = 0.0

for func in functions:
depth = func.get("big_o_depth", 1)
# 1. Ignore O(N) linear loops
if depth < 2:
continue

# 2. Base Threat (Exponential decay of performance)
func_threat = float(depth**2)

# 3. Data Gravity & Network Choke Points
db_complex = func.get("db_complexity", 0)
if db_complex > 0:
func_threat *= 1.0 + (db_complex * 0.5)

hv = func.get("hit_vector", {})
choke_multiplier = 1.0 + hv.get("api", 0) + hv.get("io", 0) + hv.get("flux", 0)
func_threat *= choke_multiplier

# 4. Structural Dampeners (Guardrails)
safety_hits = hv.get("safety", 0) + hv.get("bailout_hits", 0)
if safety_hits > 0:
func_threat *= 0.5 # 50% reduction for bounded iteration

dos_mass += func_threat

if dos_mass == 0.0:
return 0.0

# 5. Network Posture (Blast Radius)
network_multiplier = 1.0
if popularity == 0 and eq.get("api", 0) == 0:
network_multiplier = 0.10 # Safely isolated orphan
elif popularity > 0:
network_multiplier = min(1.0 + (math.log1p(popularity) / 5.0), 3.0)

total_threat_mass = dos_mass * network_multiplier

# 6. The Sigmoid Curve
t = self.risk_tuning.get("algorithmic_dos", {})
density = (total_threat_mass / max(loc + t.get("loc_padding", 150), 1)) * 100.0

threshold = t.get("threshold_base", 15.0)

try:
score = 100.0 / (1.0 + math.exp(-t.get("sigmoid_slope", 0.3) * (density - threshold)))
except OverflowError:
score = 100.0 if density > threshold else 0.0

return min(score * mp, 100.0)
```

<br><br>

---

### 🌌 Powered by the blAST Engine

This documentation is part of the [GitGalaxy Ecosystem](https://github.com/squid-protocol/gitgalaxy), an AST-free, LLM-free heuristic knowledge graph engine.

* 🧠 **[Deep Dive into the Physics Source Code](https://github.com/squid-protocol/gitgalaxy/tree/main/gitgalaxy/physics)** to see the math in action.
* 🪐 **[Explore the GitHub Repository](https://github.com/squid-protocol/gitgalaxy)** for code, tools, and updates.
* 🔭 **[Visualize your own repository at GitGalaxy.io](https://gitgalaxy.io/)** using our interactive 3D WebGPU dashboard.

---

**[⬅️ Back to Master Index](index.md)**
1 change: 1 addition & 0 deletions gitgalaxy/galaxyscope.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

try:
import yaml

HAS_PYYAML = True
except ImportError:
HAS_PYYAML = False
Expand Down
130 changes: 121 additions & 9 deletions gitgalaxy/physics/signal_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,27 @@ def calculate_risk_vector(
umbrella_bonus: float = 0.0,
) -> Dict[str, Any]:
"""Calculates risk exposure, temporal physics, and per-file physical impact."""
loc = max(meta.get("coding_loc", 1), 1)
total_loc = max(meta.get("total_loc", loc), 1)
doc_lines = meta.get("doc_loc", 0)
lang_id = meta.get("lang_id", "undeterminable")
rel_path = meta.get("path", "unknown")
loc = 1 # Safe fallback for the except block

try:
try:
loc = max(int(meta.get("coding_loc", 1)), 1)
except (ValueError, TypeError):
loc = 1

try:
total_loc = max(int(meta.get("total_loc", loc)), 1)
except (ValueError, TypeError):
total_loc = loc

try:
doc_lines = int(meta.get("doc_loc", 0))
except (ValueError, TypeError):
doc_lines = 0

lang_id = meta.get("lang_id", "undeterminable")

import os

filename = os.path.basename(rel_path).lower()
Expand Down Expand Up @@ -719,6 +733,9 @@ def calculate_risk_vector(
"churn": 0.0,
"documentation": doc_score,
"civil_war": self._calc_civil_war(equations),
"algorithmic_dos": self._calc_algorithmic_dos(
loc, equations, mp_map.get("algorithmic_dos", 1.0), functions, popularity
),
# ---> BIAXIAL WEAPONIZATION <---
"obscured_payload": self._calc_obscured_payload(
loc,
Expand All @@ -735,6 +752,7 @@ def calculate_risk_vector(
global_archetype,
global_drift,
local_drift,
max_big_o,
),
"injection_surface": self._calc_injection_surface(
loc,
Expand Down Expand Up @@ -1833,6 +1851,7 @@ def _calc_logic_bomb(
archetype: str,
global_drift: float,
local_drift: float,
max_big_o: int = 1,
) -> float:
"""
Calculates Logic Bomb / Sabotage Exposure.
Expand Down Expand Up @@ -1860,6 +1879,22 @@ def _calc_logic_bomb(
# ---> APPLY THE ARCHETYPE CONTEXT <---
sabotage_mass = (trigger * payload) * arch_multiplier

# ---> THE ALGORITHMIC DOS SPIKE (Big-O Vulnerability) <---
if max_big_o >= 3:
# 1. API/IO Choke Point (User-Controlled N or Network Latency)
attack_surface = eq.get("api", 0) + eq.get("sec_io", 0) + eq.get("io", 0)
dos_mass = attack_surface * (max_big_o**2) * 10.0

# 2. State Flux Bomb (Memory Exhaustion)
flux = eq.get("flux", 0) + eq.get("globals", 0)
dos_mass += flux * (max_big_o**2) * 5.0

# 3. The Shielding Dampener (Safety Guardrails)
if eq.get("safety", 0) > 0 or eq.get("bailout_hits", 0) > 0:
dos_mass *= 0.25 # 75% reduction if guardrails exist

sabotage_mass += dos_mass

# ---> THE TAINT SPIKE <---
# If the LHS Slicer confirmed data crossed from I/O to Danger, risk is absolute.
taint_confirmed = eq.get("sec_tainted_injection", 0)
Expand All @@ -1876,6 +1911,9 @@ def _calc_logic_bomb(
return 0.0

explicit_threats = eq.get("sec_graveyard", 0) + eq.get("sec_heat_triggers", 0)
if max_big_o >= 3:
explicit_threats += 1 # Preserve DoS Mass from being zeroed out

if explicit_threats == 0 and taint_confirmed == 0 and not getattr(self, "is_paranoid", False):
sabotage_mass *= 0.05

Expand Down Expand Up @@ -2072,6 +2110,76 @@ def _calc_secrets_risk(self, loc: int, eq: Dict[str, int], mp: float) -> float:

return min(score * mp, 100.0)

def _calc_algorithmic_dos(
self,
loc: int,
eq: Dict[str, int],
mp: float,
functions: List[Dict[str, Any]],
popularity: int,
) -> float:
"""
Calculates Algorithmic DoS Exposure based on Big-O depth, data gravity, and network choke points.
"""
if not functions:
return 0.0

dos_mass = 0.0

for func in functions:
depth = func.get("big_o_depth", 1)
if depth < 2:
continue

# 1. The Base Threat (Exponential decay of performance)
func_threat = float(depth**2)

# 2. The Amplifiers (Network & Data Gravity)
db_complex = func.get("db_complexity", 0)
if db_complex > 0:
func_threat *= 1.0 + (db_complex * 0.5)

hv = func.get("hit_vector", {})
api_hits = hv.get("api", 0)
io_hits = hv.get("io", 0) + hv.get("sec_io", 0)
flux_hits = hv.get("flux", 0) + hv.get("globals", 0)

choke_multiplier = 1.0 + api_hits + io_hits + flux_hits
func_threat *= choke_multiplier

# 3. The Dampeners (Guardrails)
safety_hits = hv.get("safety", 0) + hv.get("bailout_hits", 0) + hv.get("cleanup", 0)
if safety_hits > 0:
func_threat *= 0.5 # 50% reduction for bounded iteration

dos_mass += func_threat

if dos_mass == 0.0:
return 0.0

# Apply File-Level Network Dampeners/Amplifiers
network_multiplier = 1.0
if popularity == 0 and eq.get("api", 0) == 0:
network_multiplier = 0.10 # Safely isolated orphan
elif popularity > 0:
network_multiplier = min(1.0 + (math.log1p(popularity) / 5.0), 3.0)

total_threat_mass = dos_mass * network_multiplier

# Fetch tuning parameters
t = self.risk_tuning.get("algorithmic_dos", {})
density = (total_threat_mass / max(loc + t.get("loc_padding", 150), 1)) * 100.0

threshold = t.get("threshold_base", 15.0)
slope = t.get("sigmoid_slope", 0.3)

try:
score = 100.0 / (1.0 + math.exp(-slope * (density - threshold)))
except OverflowError:
score = 100.0 if density > threshold else 0.0

return min(score * mp, 100.0)

# --------------------------------------------------------------------------
# REPORTING UTILITIES
# --------------------------------------------------------------------------
Expand Down Expand Up @@ -2106,8 +2214,10 @@ def generate_forensic_report(self, parsed_files: List[Dict[str, Any]]) -> Dict[s

def get_cumulative_risk(f):
rv = f.get("risk_vector", [])
if not isinstance(rv, list):
return 0.0
# Sum all exposures except civil_war
return sum(val for i, val in enumerate(rv) if i != civil_war_idx and i < len(rv))
return sum(val for i, val in enumerate(rv) if i != civil_war_idx and i < len(rv) and isinstance(val, (int, float)))

sorted_by_cumulative = sorted(active_files, key=get_cumulative_risk, reverse=True)

Expand All @@ -2124,16 +2234,17 @@ def get_cumulative_risk(f):

for file_data in active_files:
net = file_data.get("telemetry", {}).get("network_metrics", {})
rv = file_data.get("risk_vector", [])
raw_rv = file_data.get("risk_vector", [])
rv = raw_rv if isinstance(raw_rv, list) else []
p = file_data.get("path", "")

btw = net.get("betweenness_score") or 0.0
close = net.get("closeness_score") or 0.0
pr = net.get("normalized_blast_radius") or 0.0

flux_risk = rv[flux_idx] if flux_idx >= 0 and len(rv) > flux_idx else 0.0
err_risk = rv[err_idx] if err_idx >= 0 and len(rv) > err_idx else 0.0
doc_risk = rv[doc_idx] if doc_idx >= 0 and len(rv) > doc_idx else 0.0
flux_risk = float(rv[flux_idx]) if flux_idx >= 0 and len(rv) > flux_idx and isinstance(rv[flux_idx], (int, float)) else 0.0
err_risk = float(rv[err_idx]) if err_idx >= 0 and len(rv) > err_idx and isinstance(rv[err_idx], (int, float)) else 0.0
doc_risk = float(rv[doc_idx]) if doc_idx >= 0 and len(rv) > doc_idx and isinstance(rv[doc_idx], (int, float)) else 0.0

bottlenecks["contagious_mutation"].append(
{
Expand Down Expand Up @@ -2211,6 +2322,7 @@ def _get_locational_multipliers(self, path: str) -> Dict[str, float]:
"State Flux Exposure": "flux",
"Specification Exposure": "spec",
"Churn Exposure": "churn",
"Algorithmic DoS Exposure": "algorithmic_dos",
# --- SECURITY LENSES ---
"Obscured Payload Exposure": "obscured",
"Logic Bomb Exposure": "logic_bomb",
Expand Down
1 change: 1 addition & 0 deletions gitgalaxy/recorders/llm_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ def _build_markdown(
"injection_surface",
"memory_corruption",
"secrets_risk",
"algorithmic_dos",
]
vuln_found = False
for v_key in vuln_keys:
Expand Down
Loading
Loading