Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion aikido_zen/vulnerabilities/nosql_injection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def match_filter_part_in_user(user_input, filter_part, path_to_payload=None):

if is_mapping(user_input):
filtered_input = remove_keys_that_dont_start_with_dollar_sign(user_input)
if filtered_input == filter_part:
if is_user_operators_subset_of(filtered_input, filter_part):
return {
"match": True,
"pathToPayload": build_path_to_payload(path_to_payload),
Expand Down Expand Up @@ -57,6 +57,20 @@ def remove_keys_that_dont_start_with_dollar_sign(nosql_filter):
return {key: value for key, value in nosql_filter.items() if key.startswith("$")}


def is_user_operators_subset_of(user_operators, filter_operators):
"""
Returns True if every operator in user_operators is present in filter_operators
with the same value — i.e. the user-supplied operators are a subset of the filter.
An empty user_operators dict never matches (no operators = no injection).
"""
has_keys = False
for key, value in user_operators.items():
if key not in filter_operators or filter_operators[key] != value:
return False
has_keys = True
return has_keys
Comment on lines +60 to +71
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_user_operators_subset_of uses a has_keys flag to detect empty input. Add an early guard (if not user_operators: return False) to simplify the function and remove the auxiliary variable.

Show fix
Suggested change
def is_user_operators_subset_of(user_operators, filter_operators):
"""
Returns True if every operator in user_operators is present in filter_operators
with the same valuei.e. the user-supplied operators are a subset of the filter.
An empty user_operators dict never matches (no operators = no injection).
"""
has_keys = False
for key, value in user_operators.items():
if key not in filter_operators or filter_operators[key] != value:
return False
has_keys = True
return has_keys
def is_user_operators_subset_of(user_operators, filter_operators):
if not user_operators:
return False
for key, value in user_operators.items():
if key not in filter_operators or filter_operators[key] != value:
return False
return True
Details

✨ AI Reasoning
​The new function is_user_operators_subset_of checks whether the user_operators dict is non-empty and that each entry matches in filter_operators. Currently it iterates all items and uses a has_keys flag to detect emptiness, returning that at the end. An explicit early-return guard (if not user_operators: return False) would make the intent clearer and remove the auxiliary has_keys state, improving readability and maintainability.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info



def find_filter_part_with_operators(user_input, part_of_filter):
"""
This looks for parts in the filter that have NSQL operators (e.g. $)
Expand Down
102 changes: 102 additions & 0 deletions aikido_zen/vulnerabilities/nosql_injection/init_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,105 @@ def test_ignores_safe_pipeline_aggregations(create_context):
)
== {}
)


def test_detects_injection_when_app_merges_user_operators_with_own_dollar_keys(
create_context,
):
assert detect_nosql_injection(
create_context(body={"username": {"$ne": None}}),
{"title": {"$ne": None, "$exists": True}},
) == {
"injection": True,
"source": "body",
"pathToPayload": ".username",
"payload": {"$ne": None, "$exists": True},
}


def test_detects_injection_when_app_prepends_own_dollar_key_before_user_operators(
create_context,
):
assert detect_nosql_injection(
create_context(body={"username": {"$gt": ""}}),
{"title": {"$exists": True, "$gt": ""}},
) == {
"injection": True,
"source": "body",
"pathToPayload": ".username",
"payload": {"$exists": True, "$gt": ""},
}


def test_does_not_flag_when_user_operator_value_differs(create_context):
assert (
detect_nosql_injection(
create_context(body={"username": {"$ne": "different"}}),
{"title": {"$ne": None, "$exists": True}},
)
== {}
)


def test_detects_injection_when_app_adds_non_dollar_key_to_subobject(create_context):
assert detect_nosql_injection(
create_context(body={"field": {"$elemMatch": {"$gt": 5}}}),
{"items": {"$elemMatch": {"$gt": 5, "verified": True}}},
) == {
"injection": True,
"source": "body",
"pathToPayload": ".field.$elemMatch",
"payload": {"$gt": 5},
}


def test_does_not_flag_when_user_sends_empty_object(create_context):
assert (
detect_nosql_injection(
create_context(body={"username": {}}),
{"title": {"$exists": True}},
)
== {}
)


def test_does_not_flag_when_user_object_has_only_non_dollar_keys(create_context):
assert (
detect_nosql_injection(
create_context(body={"username": {"name": "alice"}}),
{"title": {"$exists": True}},
)
== {}
)


def test_does_not_flag_when_user_dollar_key_absent_from_filter(create_context):
assert (
detect_nosql_injection(
create_context(body={"username": {"$ne": None}}),
{"title": {"$exists": True}},
)
== {}
)


def test_does_not_flag_when_filter_uses_subset_of_user_operators(create_context):
assert (
detect_nosql_injection(
create_context(body={"username": {"$ne": None, "$gt": 0}}),
{"title": {"$ne": None}},
)
== {}
)


def test_does_not_flag_when_app_ignores_user_operators_and_uses_hardcoded_filter(
create_context,
):
assert (
detect_nosql_injection(
create_context(body={"username": {"$ne": None}}),
{"username": "hardcoded"},
)
== {}
)
Loading