From 2451f6c41626f3d287bf26a62168c3c102e4c4dd Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Tue, 2 Jun 2026 18:49:29 +0200 Subject: [PATCH] Store body_raw as well as a fix for bb report --- aikido_zen/context/__init__.py | 11 +++- aikido_zen/context/init_test.py | 33 ++++++++++++ .../context_contains_sql_injection_test.py | 52 +++++++++++++++++++ 3 files changed, 95 insertions(+), 1 deletion(-) diff --git a/aikido_zen/context/__init__.py b/aikido_zen/context/__init__.py index 04a00e6a5..243df2e69 100644 --- a/aikido_zen/context/__init__.py +++ b/aikido_zen/context/__init__.py @@ -17,7 +17,7 @@ from .extract_route_params import extract_route_params from ..helpers.headers import Headers -UINPUT_SOURCES = ["body", "cookies", "query", "headers", "xml", "route_params"] +UINPUT_SOURCES = ["body", "body_raw", "cookies", "query", "headers", "xml", "route_params"] current_context = contextvars.ContextVar("current_context", default=None) WSGI_SOURCES = ["django", "flask"] @@ -79,6 +79,7 @@ def __reduce__(self): "remote_address": self.remote_address, "url": self.url, "body": self.body, + "body_raw": self.body_raw, "headers": self.headers, "query": self.query, "cookies": self.cookies, @@ -115,6 +116,7 @@ def set_body(self, body): def set_body_internal(self, body): """Sets the body and checks if it's possibly JSON""" self.body = body + self.body_raw = None if isinstance(self.body, (str, bytes)) and len(body) == 0: # Make sure that empty bodies like b"" don't get sent. self.body = None @@ -125,6 +127,12 @@ def set_body_internal(self, body): try: parsed_body = json.loads(self.body) if parsed_body: + # Save the raw decoded string so injection detection still works + # against code that reads the body as raw bytes/string. json.loads + # decodes unicode escapes (e.g. # -> #), creating a mismatch + # between self.body and what reaches the sink if the application + # reads request.data directly instead of request.json. + self.body_raw = self.body.decode("utf-8", errors="replace") self.body = parsed_body return except (JSONDecodeError, ValueError): @@ -139,6 +147,7 @@ def set_body_internal(self, body): # Might be JSON, but might not have been parsed correctly by server because of wrong headers parsed_body = json.loads(self.body) if parsed_body: + self.body_raw = self.body self.body = parsed_body def get_route_metadata(self): diff --git a/aikido_zen/context/init_test.py b/aikido_zen/context/init_test.py index a8ca625e0..4d763698b 100644 --- a/aikido_zen/context/init_test.py +++ b/aikido_zen/context/init_test.py @@ -62,6 +62,7 @@ def test_wsgi_context_1(): "url": "https://example.com/hello", "query": {"user": ["JohnDoe"], "age": ["30", "35"]}, "body": 123, + "body_raw": None, "route": "/hello", "subdomains": [], "user": None, @@ -94,6 +95,7 @@ def test_wsgi_context_2(): "url": "http://localhost:8080/hello", "query": {"user": ["JohnDoe"], "age": ["30", "35"]}, "body": {"test": True}, + "body_raw": None, "route": "/hello", "subdomains": [], "user": None, @@ -317,3 +319,34 @@ def test_set_bytes_json_with_surrogate_bytes(): context = Context(req=basic_wsgi_req, body=body, source="flask") assert isinstance(context.body, dict) assert context.body.get("username") == {"$regex": ".*"} + + +def test_body_raw_set_when_bytes_json_parsed(): + # Regression: AIKIDO-FVRDOX5M — json.loads decodes unicode escapes (e.g. # -> #) + # so self.body has '#' but the application's raw read still has '#'. + # body_raw must preserve the pre-decode string so detection finds the raw form. + body = b'"\\u0023 payload"' + context = Context(req=basic_wsgi_req, body=body, source="flask") + assert context.body == "# payload" + assert context.body_raw == '"\\u0023 payload"' + + +def test_body_raw_set_when_string_json_parsed(): + # Same bypass via a string body (framework already decoded bytes before set_body). + body = '"\\u0023 payload"' + context = Context(req=basic_wsgi_req, body=body, source="flask") + assert context.body == "# payload" + assert context.body_raw == '"\\u0023 payload"' + + +def test_body_raw_none_when_no_json_parsing(): + # When the body is not JSON-parsed, body_raw should remain None. + context = Context(req=basic_wsgi_req, body=b"plain bytes", source="flask") + assert context.body == "plain bytes" + assert context.body_raw is None + + +def test_body_raw_none_for_non_string_body(): + context = Context(req=basic_wsgi_req, body={"key": "value"}, source="flask") + assert context.body == {"key": "value"} + assert context.body_raw is None diff --git a/aikido_zen/vulnerabilities/sql_injection/context_contains_sql_injection_test.py b/aikido_zen/vulnerabilities/sql_injection/context_contains_sql_injection_test.py index 7419705fd..3775799b5 100644 --- a/aikido_zen/vulnerabilities/sql_injection/context_contains_sql_injection_test.py +++ b/aikido_zen/vulnerabilities/sql_injection/context_contains_sql_injection_test.py @@ -1,5 +1,7 @@ import pytest from .context_contains_sql_injection import context_contains_sql_injection +from aikido_zen.context import Context +from aikido_zen.helpers.headers import Headers import aikido_zen.test_utils as test_utils @@ -30,3 +32,53 @@ def test_doesnt_crash_with_invalid_sql(invalid_input): dialect="mysql", ) assert result == {} + + +def _make_context_with_bytes_body(body_bytes): + """Create a minimal context whose body comes from raw bytes (as a real request would).""" + ctx = Context.__new__(Context) + ctx.cookies = {} + ctx.headers = Headers() + ctx.remote_address = "1.2.3.4" + ctx.method = "POST" + ctx.url = "http://localhost:5000/user" + ctx.query = {} + ctx.source = "flask" + ctx.route = "/user" + ctx.subdomains = [] + ctx.parsed_userinput = {} + ctx.xml = {} + ctx.outgoing_req_redirects = [] + ctx.user = None + ctx.rate_limit_group = None + ctx.executed_middleware = False + ctx.protection_forced_off = False + ctx.route_params = [] + ctx.set_body(body_bytes) + return ctx + + +def test_unicode_escape_sqli_bypass_via_bytes_body(): + # Regression: AIKIDO-FVRDOX5M — json.loads decodes # -> # so self.body has '#' + # but the sink receives the raw decoded string with literal '#'. Without + # body_raw the firewall checks '#' against a query that has '#' and misses it. + raw_payload = b'"\\' + b"u0023 ' Union Select password From users -- x\"" + ctx = _make_context_with_bytes_body(raw_payload) + + # body is the JSON-decoded form (with '#'); body_raw is the original decoded bytes string + assert ctx.body.startswith("#") + assert ctx.body_raw is not None + assert "\\u0023" in ctx.body_raw + + # The SQL query the application builds using request.data.decode() (raw bytes → string) + user_id_raw = ctx.body_raw # what reaches the sink when app reads raw body + sql = f"SELECT username FROM users WHERE id = '{user_id_raw}'" + + result = context_contains_sql_injection( + sql=sql, + operation="pymysql.execute", + context=ctx, + dialect="mysql", + ) + assert result != {}, "SQLi via unicode-escape bypass should be detected" + assert result["source"] == "body_raw"