Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion aikido_zen/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .extract_route_params import extract_route_params
from ..helpers.headers import Headers

UINPUT_SOURCES = ["body", "cookies", "query", "headers", "xml", "route_params"]
UINPUT_SOURCES = ["body", "body_raw", "cookies", "query", "headers", "xml", "route_params"]
current_context = contextvars.ContextVar("current_context", default=None)

WSGI_SOURCES = ["django", "flask"]
Expand Down Expand Up @@ -79,6 +79,7 @@ def __reduce__(self):
"remote_address": self.remote_address,
"url": self.url,
"body": self.body,
"body_raw": self.body_raw,
"headers": self.headers,
"query": self.query,
"cookies": self.cookies,
Expand Down Expand Up @@ -115,6 +116,7 @@ def set_body(self, body):
def set_body_internal(self, body):
"""Sets the body and checks if it's possibly JSON"""
self.body = body
self.body_raw = None
if isinstance(self.body, (str, bytes)) and len(body) == 0:
# Make sure that empty bodies like b"" don't get sent.
self.body = None
Expand All @@ -125,6 +127,12 @@ def set_body_internal(self, body):
try:
parsed_body = json.loads(self.body)
if parsed_body:
# Save the raw decoded string so injection detection still works
# against code that reads the body as raw bytes/string. json.loads
# decodes unicode escapes (e.g. # -> #), creating a mismatch
# between self.body and what reaches the sink if the application
# reads request.data directly instead of request.json.
self.body_raw = self.body.decode("utf-8", errors="replace")
self.body = parsed_body
return
except (JSONDecodeError, ValueError):
Expand All @@ -139,6 +147,7 @@ def set_body_internal(self, body):
# Might be JSON, but might not have been parsed correctly by server because of wrong headers
parsed_body = json.loads(self.body)
if parsed_body:
self.body_raw = self.body
self.body = parsed_body

def get_route_metadata(self):
Expand Down
33 changes: 33 additions & 0 deletions aikido_zen/context/init_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def test_wsgi_context_1():
"url": "https://example.com/hello",
"query": {"user": ["JohnDoe"], "age": ["30", "35"]},
"body": 123,
"body_raw": None,
"route": "/hello",
"subdomains": [],
"user": None,
Expand Down Expand Up @@ -94,6 +95,7 @@ def test_wsgi_context_2():
"url": "http://localhost:8080/hello",
"query": {"user": ["JohnDoe"], "age": ["30", "35"]},
"body": {"test": True},
"body_raw": None,
"route": "/hello",
"subdomains": [],
"user": None,
Expand Down Expand Up @@ -317,3 +319,34 @@ def test_set_bytes_json_with_surrogate_bytes():
context = Context(req=basic_wsgi_req, body=body, source="flask")
assert isinstance(context.body, dict)
assert context.body.get("username") == {"$regex": ".*"}


def test_body_raw_set_when_bytes_json_parsed():
# Regression: AIKIDO-FVRDOX5M — json.loads decodes unicode escapes (e.g. # -> #)
# so self.body has '#' but the application's raw read still has '#'.
# body_raw must preserve the pre-decode string so detection finds the raw form.
body = b'"\\u0023 payload"'
context = Context(req=basic_wsgi_req, body=body, source="flask")
assert context.body == "# payload"
assert context.body_raw == '"\\u0023 payload"'


def test_body_raw_set_when_string_json_parsed():
# Same bypass via a string body (framework already decoded bytes before set_body).
body = '"\\u0023 payload"'
context = Context(req=basic_wsgi_req, body=body, source="flask")
assert context.body == "# payload"
assert context.body_raw == '"\\u0023 payload"'


def test_body_raw_none_when_no_json_parsing():
# When the body is not JSON-parsed, body_raw should remain None.
context = Context(req=basic_wsgi_req, body=b"plain bytes", source="flask")
assert context.body == "plain bytes"
assert context.body_raw is None


def test_body_raw_none_for_non_string_body():
context = Context(req=basic_wsgi_req, body={"key": "value"}, source="flask")
assert context.body == {"key": "value"}
assert context.body_raw is None
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pytest
from .context_contains_sql_injection import context_contains_sql_injection
from aikido_zen.context import Context
from aikido_zen.helpers.headers import Headers
import aikido_zen.test_utils as test_utils


Expand Down Expand Up @@ -30,3 +32,53 @@ def test_doesnt_crash_with_invalid_sql(invalid_input):
dialect="mysql",
)
assert result == {}


def _make_context_with_bytes_body(body_bytes):
"""Create a minimal context whose body comes from raw bytes (as a real request would)."""
ctx = Context.__new__(Context)
ctx.cookies = {}
ctx.headers = Headers()
ctx.remote_address = "1.2.3.4"
ctx.method = "POST"
ctx.url = "http://localhost:5000/user"
ctx.query = {}
ctx.source = "flask"
ctx.route = "/user"
ctx.subdomains = []
ctx.parsed_userinput = {}
ctx.xml = {}
ctx.outgoing_req_redirects = []
ctx.user = None
ctx.rate_limit_group = None
ctx.executed_middleware = False
ctx.protection_forced_off = False
ctx.route_params = []
ctx.set_body(body_bytes)
return ctx


def test_unicode_escape_sqli_bypass_via_bytes_body():
# Regression: AIKIDO-FVRDOX5M — json.loads decodes # -> # so self.body has '#'
# but the sink receives the raw decoded string with literal '#'. Without
# body_raw the firewall checks '#' against a query that has '#' and misses it.
raw_payload = b'"\\' + b"u0023 ' Union Select password From users -- x\""
ctx = _make_context_with_bytes_body(raw_payload)

# body is the JSON-decoded form (with '#'); body_raw is the original decoded bytes string
assert ctx.body.startswith("#")
assert ctx.body_raw is not None
assert "\\u0023" in ctx.body_raw

# The SQL query the application builds using request.data.decode() (raw bytes → string)
user_id_raw = ctx.body_raw # what reaches the sink when app reads raw body
sql = f"SELECT username FROM users WHERE id = '{user_id_raw}'"

result = context_contains_sql_injection(
sql=sql,
operation="pymysql.execute",
context=ctx,
dialect="mysql",
)
assert result != {}, "SQLi via unicode-escape bypass should be detected"
assert result["source"] == "body_raw"
Loading