Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aikido_zen/sources/functions/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def post_response(status_code):
if cache.is_bypassed_ip(context.remote_address):
return

attack_wave = attack_wave_detector_store.is_attack_wave(context)
attack_wave = attack_wave_detector_store.is_attack_wave(context, status_code)
if attack_wave:
cache.stats.on_detected_attack_wave(blocked=False)

Expand Down
4 changes: 2 additions & 2 deletions aikido_zen/storage/attack_wave_detector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ def __init__(self):
self._detector = AttackWaveDetector()
self._lock = threading.RLock() # Reentrant lock for thread safety

def is_attack_wave(self, context: Context) -> bool:
def is_attack_wave(self, context: Context, status_code: int = 404) -> bool:
with self._lock:
return self._detector.is_attack_wave(context)
return self._detector.is_attack_wave(context, status_code)

def get_samples_for_ip(self, ip: str):
with self._lock:
Expand Down
4 changes: 2 additions & 2 deletions aikido_zen/storage/attack_wave_detector_store_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ def test_mock_detector_integration(mock_detector_class):
store = AttackWaveDetectorStore()
context = test_utils.generate_context()

# Should use the mocked detector
# Should use the mocked detector (default status_code=404)
result = store.is_attack_wave(context)
assert result is True
mock_detector.is_attack_wave.assert_called_once_with(context)
mock_detector.is_attack_wave.assert_called_once_with(context, 404)
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
time_to_live_in_ms=self.attack_wave_time_frame,
)

def is_attack_wave(self, context: Context) -> bool:
def is_attack_wave(self, context: Context, status_code: int = 404) -> bool:
"""
Function gets called with context to check if there is an attack wave request.
"""
Expand All @@ -45,7 +45,7 @@ def is_attack_wave(self, context: Context) -> bool:
if self.sent_events_map.get(ip) is not None:
return False

if not is_web_scanner(context):
if not is_web_scanner(context, status_code):
return False

# Increment suspicious requests count -> there is a new or first suspicious request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,31 @@
"sqlitedb",
"sqlite3db",
}

# Extensions that belong to other platforms (e.g. PHP, Java).
# A 200 response may mean the Python app is proxying to that backend,
# so we only count these as scan hits when the response is 404.
foreign_extensions = {
"php",
"php3",
"php4",
"php5",
"phtml",
"java",
"jsp",
"jspx",
}

filenames = {name.lower() for name in file_names}
directories = {name.lower() for name in directory_names}


def is_web_scan_path(path: str) -> bool:
def is_web_scan_path(path: str, status_code: int = 404) -> bool:
"""
is_web_scan_path gets the current route and wants to determine whether it's a test by some web scanner.
Checks filename if it exists (list of suspicious filenames & list of supsicious extensions)
Checks all other segments for suspicious directories
Foreign-platform extensions (php, jsp, etc.) are only counted when status_code is 404.
"""
normalized = path.lower()
segments = normalized.split("/")
Expand All @@ -40,6 +56,8 @@ def is_web_scan_path(path: str) -> bool:
ext = filename.split(".")[-1]
if ext in file_extensions:
return True
if ext in foreign_extensions and status_code == 404:
return True

for directory in segments:
if directory in directories:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,41 @@


def test_is_web_scan_path():
assert is_web_scan_path("/.env")
assert is_web_scan_path("/test/.env")
assert is_web_scan_path("/test/.env.bak")
assert is_web_scan_path("/.git/config")
assert is_web_scan_path("/.aws/config")
assert is_web_scan_path("/some/path/.git/test")
assert is_web_scan_path("/some/path/.gitlab-ci.yml")
assert is_web_scan_path("/some/path/.github/workflows/test.yml")
assert is_web_scan_path("/.travis.yml")
assert is_web_scan_path("/../example/")
assert is_web_scan_path("/./test")
assert is_web_scan_path("/Cargo.lock")
assert is_web_scan_path("/System32/test")
assert is_web_scan_path("/.env", 404)
assert is_web_scan_path("/test/.env", 404)
assert is_web_scan_path("/test/.env.bak", 404)
assert is_web_scan_path("/.git/config", 404)
assert is_web_scan_path("/.aws/config", 404)
assert is_web_scan_path("/some/path/.git/test", 404)
assert is_web_scan_path("/some/path/.gitlab-ci.yml", 404)
assert is_web_scan_path("/some/path/.github/workflows/test.yml", 404)
assert is_web_scan_path("/.travis.yml", 404)
assert is_web_scan_path("/../example/", 404)
assert is_web_scan_path("/./test", 404)
assert is_web_scan_path("/Cargo.lock", 404)
assert is_web_scan_path("/System32/test", 404)


def test_is_not_web_scan_path():
assert not is_web_scan_path("/test/file.txt")
assert not is_web_scan_path("/some/route/to/file.txt")
assert not is_web_scan_path("/some/route/to/file.json")
assert not is_web_scan_path("/en")
assert not is_web_scan_path("/")
assert not is_web_scan_path("/test/route")
assert not is_web_scan_path("/static/file.css")
assert not is_web_scan_path("/static/file.a461f56e.js")
assert not is_web_scan_path("/test/file.txt", 404)
assert not is_web_scan_path("/some/route/to/file.txt", 404)
assert not is_web_scan_path("/some/route/to/file.json", 404)
assert not is_web_scan_path("/en", 404)
assert not is_web_scan_path("/", 404)
assert not is_web_scan_path("/test/route", 404)
assert not is_web_scan_path("/static/file.css", 404)
assert not is_web_scan_path("/static/file.a461f56e.js", 404)


def test_foreign_extensions_404():
assert is_web_scan_path("/admin.php", 404)
assert is_web_scan_path("/app.jsp", 404)


def test_foreign_extensions_non_404():
assert not is_web_scan_path("/admin.php", 200)
assert not is_web_scan_path("/admin.php", 301)
assert not is_web_scan_path("/app.jsp", 200)


def test_no_duplicates_in_file_names():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
)


def is_web_scanner(context: Context) -> bool:
def is_web_scanner(context: Context, status_code: int = 404) -> bool:
if context.method and is_web_scan_method(context.method):
return True
if context.route and is_web_scan_path(context.route):
if context.route and is_web_scan_path(context.route, status_code):
return True
if query_params_contain_dangerous_strings(context):
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,32 @@ def get_test_context(path="/", method="GET", query=None):


def test_is_web_scanner():
assert is_web_scanner(get_test_context("/wp-config.php", "GET"))
assert is_web_scanner(get_test_context("/.env", "GET"))
assert is_web_scanner(get_test_context("/test/.env.bak", "GET"))
assert is_web_scanner(get_test_context("/.git/config", "GET"))
assert is_web_scanner(get_test_context("/.aws/config", "GET"))
assert is_web_scanner(get_test_context("/../secret", "GET"))
assert is_web_scanner(get_test_context("/", "BADMETHOD"))
assert is_web_scanner(get_test_context("/", "GET", {"test": "SELECT * FROM admin"}))
assert is_web_scanner(get_test_context("/", "GET", {"test": "../etc/passwd"}))
assert is_web_scanner(get_test_context("/wp-config.php", "GET"), 404)
assert is_web_scanner(get_test_context("/.env", "GET"), 404)
assert is_web_scanner(get_test_context("/test/.env.bak", "GET"), 404)
assert is_web_scanner(get_test_context("/.git/config", "GET"), 404)
assert is_web_scanner(get_test_context("/.aws/config", "GET"), 404)
assert is_web_scanner(get_test_context("/../secret", "GET"), 404)
assert is_web_scanner(get_test_context("/", "BADMETHOD"), 404)
assert is_web_scanner(
get_test_context("/", "GET", {"test": "SELECT * FROM admin"}), 404
)
assert is_web_scanner(get_test_context("/", "GET", {"test": "../etc/passwd"}), 404)


def test_is_not_web_scanner():
assert not is_web_scanner(get_test_context("graphql", "POST"))
assert not is_web_scanner(get_test_context("/api/v1/users", "GET"))
assert not is_web_scanner(get_test_context("/public/index.html", "GET"))
assert not is_web_scanner(get_test_context("/static/js/app.js", "GET"))
assert not is_web_scanner(get_test_context("/uploads/image.png", "GET"))
assert not is_web_scanner(get_test_context("/", "GET", {"test": "1'"}))
assert not is_web_scanner(get_test_context("/", "GET", {"test": "abcd"}))
assert not is_web_scanner(get_test_context("graphql", "POST"), 404)
assert not is_web_scanner(get_test_context("/api/v1/users", "GET"), 404)
assert not is_web_scanner(get_test_context("/public/index.html", "GET"), 404)
assert not is_web_scanner(get_test_context("/static/js/app.js", "GET"), 404)
assert not is_web_scanner(get_test_context("/uploads/image.png", "GET"), 404)
assert not is_web_scanner(get_test_context("/", "GET", {"test": "1'"}), 404)
assert not is_web_scanner(get_test_context("/", "GET", {"test": "abcd"}), 404)


def test_foreign_extension_only_on_404():
assert is_web_scanner(get_test_context("/admin.php", "GET"), 404)
assert not is_web_scanner(get_test_context("/admin.php", "GET"), 200)
assert not is_web_scanner(get_test_context("/admin.php", "GET"), 301)
assert is_web_scanner(get_test_context("/app.jsp", "GET"), 404)
assert not is_web_scanner(get_test_context("/app.jsp", "GET"), 200)
Loading