From e5a91fb9004a5cd1d3f1f2c25c33a099501d24ea Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Mon, 1 Jun 2026 15:16:52 +0200 Subject: [PATCH 1/2] Count 404 requests with foreign extensions as attack wave scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports AikidoSec/firewall-node#1041 to Python. Requests to foreign-platform extensions (php, java, jsp, etc.) are only counted as scan hits when the response status code is 404 — a 200 may indicate the Python app is proxying to another backend. Co-Authored-By: Claude Sonnet 4.6 --- .../sources/functions/request_handler.py | 2 +- .../storage/attack_wave_detector_store.py | 4 +- .../attack_wave_detector.py | 4 +- .../attack_wave_detection/is_web_scan_path.py | 20 ++++++- .../is_web_scan_path_test.py | 53 +++++++++++-------- .../attack_wave_detection/is_web_scanner.py | 4 +- .../is_web_scanner_test.py | 40 ++++++++------ 7 files changed, 82 insertions(+), 45 deletions(-) diff --git a/aikido_zen/sources/functions/request_handler.py b/aikido_zen/sources/functions/request_handler.py index 33d37ea09..c94accb19 100644 --- a/aikido_zen/sources/functions/request_handler.py +++ b/aikido_zen/sources/functions/request_handler.py @@ -106,7 +106,7 @@ def post_response(status_code): if cache.is_bypassed_ip(context.remote_address): return - attack_wave = attack_wave_detector_store.is_attack_wave(context) + attack_wave = attack_wave_detector_store.is_attack_wave(context, status_code) if attack_wave: cache.stats.on_detected_attack_wave(blocked=False) diff --git a/aikido_zen/storage/attack_wave_detector_store.py b/aikido_zen/storage/attack_wave_detector_store.py index e256f53d0..5944879d9 100644 --- a/aikido_zen/storage/attack_wave_detector_store.py +++ b/aikido_zen/storage/attack_wave_detector_store.py @@ -10,9 +10,9 @@ def __init__(self): self._detector = AttackWaveDetector() self._lock = threading.RLock() # Reentrant lock for thread safety - def is_attack_wave(self, context: Context) -> bool: + def is_attack_wave(self, context: Context, status_code: int = 404) -> bool: with self._lock: - return self._detector.is_attack_wave(context) + return self._detector.is_attack_wave(context, status_code) def get_samples_for_ip(self, ip: str): with self._lock: diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector.py b/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector.py index 5897c6cb9..1a083587e 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/attack_wave_detector.py @@ -34,7 +34,7 @@ def __init__( time_to_live_in_ms=self.attack_wave_time_frame, ) - def is_attack_wave(self, context: Context) -> bool: + def is_attack_wave(self, context: Context, status_code: int = 404) -> bool: """ Function gets called with context to check if there is an attack wave request. """ @@ -45,7 +45,7 @@ def is_attack_wave(self, context: Context) -> bool: if self.sent_events_map.get(ip) is not None: return False - if not is_web_scanner(context): + if not is_web_scanner(context, status_code): return False # Increment suspicious requests count -> there is a new or first suspicious request diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path.py index eacf23454..3794cfbe6 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path.py @@ -16,15 +16,31 @@ "sqlitedb", "sqlite3db", } + +# Extensions that belong to other platforms (e.g. PHP, Java). +# A 200 response may mean the Python app is proxying to that backend, +# so we only count these as scan hits when the response is 404. +foreign_extensions = { + "php", + "php3", + "php4", + "php5", + "phtml", + "java", + "jsp", + "jspx", +} + filenames = {name.lower() for name in file_names} directories = {name.lower() for name in directory_names} -def is_web_scan_path(path: str) -> bool: +def is_web_scan_path(path: str, status_code: int = 404) -> bool: """ is_web_scan_path gets the current route and wants to determine whether it's a test by some web scanner. Checks filename if it exists (list of suspicious filenames & list of supsicious extensions) Checks all other segments for suspicious directories + Foreign-platform extensions (php, jsp, etc.) are only counted when status_code is 404. """ normalized = path.lower() segments = normalized.split("/") @@ -40,6 +56,8 @@ def is_web_scan_path(path: str) -> bool: ext = filename.split(".")[-1] if ext in file_extensions: return True + if ext in foreign_extensions and status_code == 404: + return True for directory in segments: if directory in directories: diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path_test.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path_test.py index ee9d13fa1..2b7acbe1d 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path_test.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scan_path_test.py @@ -8,30 +8,41 @@ def test_is_web_scan_path(): - assert is_web_scan_path("/.env") - assert is_web_scan_path("/test/.env") - assert is_web_scan_path("/test/.env.bak") - assert is_web_scan_path("/.git/config") - assert is_web_scan_path("/.aws/config") - assert is_web_scan_path("/some/path/.git/test") - assert is_web_scan_path("/some/path/.gitlab-ci.yml") - assert is_web_scan_path("/some/path/.github/workflows/test.yml") - assert is_web_scan_path("/.travis.yml") - assert is_web_scan_path("/../example/") - assert is_web_scan_path("/./test") - assert is_web_scan_path("/Cargo.lock") - assert is_web_scan_path("/System32/test") + assert is_web_scan_path("/.env", 404) + assert is_web_scan_path("/test/.env", 404) + assert is_web_scan_path("/test/.env.bak", 404) + assert is_web_scan_path("/.git/config", 404) + assert is_web_scan_path("/.aws/config", 404) + assert is_web_scan_path("/some/path/.git/test", 404) + assert is_web_scan_path("/some/path/.gitlab-ci.yml", 404) + assert is_web_scan_path("/some/path/.github/workflows/test.yml", 404) + assert is_web_scan_path("/.travis.yml", 404) + assert is_web_scan_path("/../example/", 404) + assert is_web_scan_path("/./test", 404) + assert is_web_scan_path("/Cargo.lock", 404) + assert is_web_scan_path("/System32/test", 404) def test_is_not_web_scan_path(): - assert not is_web_scan_path("/test/file.txt") - assert not is_web_scan_path("/some/route/to/file.txt") - assert not is_web_scan_path("/some/route/to/file.json") - assert not is_web_scan_path("/en") - assert not is_web_scan_path("/") - assert not is_web_scan_path("/test/route") - assert not is_web_scan_path("/static/file.css") - assert not is_web_scan_path("/static/file.a461f56e.js") + assert not is_web_scan_path("/test/file.txt", 404) + assert not is_web_scan_path("/some/route/to/file.txt", 404) + assert not is_web_scan_path("/some/route/to/file.json", 404) + assert not is_web_scan_path("/en", 404) + assert not is_web_scan_path("/", 404) + assert not is_web_scan_path("/test/route", 404) + assert not is_web_scan_path("/static/file.css", 404) + assert not is_web_scan_path("/static/file.a461f56e.js", 404) + + +def test_foreign_extensions_404(): + assert is_web_scan_path("/admin.php", 404) + assert is_web_scan_path("/app.jsp", 404) + + +def test_foreign_extensions_non_404(): + assert not is_web_scan_path("/admin.php", 200) + assert not is_web_scan_path("/admin.php", 301) + assert not is_web_scan_path("/app.jsp", 200) def test_no_duplicates_in_file_names(): diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner.py index e1638f89e..cb6053dac 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner.py @@ -10,10 +10,10 @@ ) -def is_web_scanner(context: Context) -> bool: +def is_web_scanner(context: Context, status_code: int = 404) -> bool: if context.method and is_web_scan_method(context.method): return True - if context.route and is_web_scan_path(context.route): + if context.route and is_web_scan_path(context.route, status_code): return True if query_params_contain_dangerous_strings(context): return True diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py index eb03a44f4..a60417f1f 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py @@ -22,22 +22,30 @@ def get_test_context(path="/", method="GET", query=None): def test_is_web_scanner(): - assert is_web_scanner(get_test_context("/wp-config.php", "GET")) - assert is_web_scanner(get_test_context("/.env", "GET")) - assert is_web_scanner(get_test_context("/test/.env.bak", "GET")) - assert is_web_scanner(get_test_context("/.git/config", "GET")) - assert is_web_scanner(get_test_context("/.aws/config", "GET")) - assert is_web_scanner(get_test_context("/../secret", "GET")) - assert is_web_scanner(get_test_context("/", "BADMETHOD")) - assert is_web_scanner(get_test_context("/", "GET", {"test": "SELECT * FROM admin"})) - assert is_web_scanner(get_test_context("/", "GET", {"test": "../etc/passwd"})) + assert is_web_scanner(get_test_context("/wp-config.php", "GET"), 404) + assert is_web_scanner(get_test_context("/.env", "GET"), 404) + assert is_web_scanner(get_test_context("/test/.env.bak", "GET"), 404) + assert is_web_scanner(get_test_context("/.git/config", "GET"), 404) + assert is_web_scanner(get_test_context("/.aws/config", "GET"), 404) + assert is_web_scanner(get_test_context("/../secret", "GET"), 404) + assert is_web_scanner(get_test_context("/", "BADMETHOD"), 404) + assert is_web_scanner(get_test_context("/", "GET", {"test": "SELECT * FROM admin"}), 404) + assert is_web_scanner(get_test_context("/", "GET", {"test": "../etc/passwd"}), 404) def test_is_not_web_scanner(): - assert not is_web_scanner(get_test_context("graphql", "POST")) - assert not is_web_scanner(get_test_context("/api/v1/users", "GET")) - assert not is_web_scanner(get_test_context("/public/index.html", "GET")) - assert not is_web_scanner(get_test_context("/static/js/app.js", "GET")) - assert not is_web_scanner(get_test_context("/uploads/image.png", "GET")) - assert not is_web_scanner(get_test_context("/", "GET", {"test": "1'"})) - assert not is_web_scanner(get_test_context("/", "GET", {"test": "abcd"})) + assert not is_web_scanner(get_test_context("graphql", "POST"), 404) + assert not is_web_scanner(get_test_context("/api/v1/users", "GET"), 404) + assert not is_web_scanner(get_test_context("/public/index.html", "GET"), 404) + assert not is_web_scanner(get_test_context("/static/js/app.js", "GET"), 404) + assert not is_web_scanner(get_test_context("/uploads/image.png", "GET"), 404) + assert not is_web_scanner(get_test_context("/", "GET", {"test": "1'"}), 404) + assert not is_web_scanner(get_test_context("/", "GET", {"test": "abcd"}), 404) + + +def test_foreign_extension_only_on_404(): + assert is_web_scanner(get_test_context("/admin.php", "GET"), 404) + assert not is_web_scanner(get_test_context("/admin.php", "GET"), 200) + assert not is_web_scanner(get_test_context("/admin.php", "GET"), 301) + assert is_web_scanner(get_test_context("/app.jsp", "GET"), 404) + assert not is_web_scanner(get_test_context("/app.jsp", "GET"), 200) From 3c131c898883cb79d616427328ce0f5c3d2642e2 Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Mon, 1 Jun 2026 15:35:32 +0200 Subject: [PATCH 2/2] Fix mock assertion to include status_code and run black formatter --- aikido_zen/storage/attack_wave_detector_store_test.py | 4 ++-- .../attack_wave_detection/is_web_scanner_test.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/aikido_zen/storage/attack_wave_detector_store_test.py b/aikido_zen/storage/attack_wave_detector_store_test.py index 095f3962e..6e2344e1d 100644 --- a/aikido_zen/storage/attack_wave_detector_store_test.py +++ b/aikido_zen/storage/attack_wave_detector_store_test.py @@ -446,7 +446,7 @@ def test_mock_detector_integration(mock_detector_class): store = AttackWaveDetectorStore() context = test_utils.generate_context() - # Should use the mocked detector + # Should use the mocked detector (default status_code=404) result = store.is_attack_wave(context) assert result is True - mock_detector.is_attack_wave.assert_called_once_with(context) + mock_detector.is_attack_wave.assert_called_once_with(context, 404) diff --git a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py index a60417f1f..d5eb97e5e 100644 --- a/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py +++ b/aikido_zen/vulnerabilities/attack_wave_detection/is_web_scanner_test.py @@ -29,7 +29,9 @@ def test_is_web_scanner(): assert is_web_scanner(get_test_context("/.aws/config", "GET"), 404) assert is_web_scanner(get_test_context("/../secret", "GET"), 404) assert is_web_scanner(get_test_context("/", "BADMETHOD"), 404) - assert is_web_scanner(get_test_context("/", "GET", {"test": "SELECT * FROM admin"}), 404) + assert is_web_scanner( + get_test_context("/", "GET", {"test": "SELECT * FROM admin"}), 404 + ) assert is_web_scanner(get_test_context("/", "GET", {"test": "../etc/passwd"}), 404)