diff --git a/CLI.md b/CLI.md index eeb21a0..9b8f7d4 100644 --- a/CLI.md +++ b/CLI.md @@ -943,6 +943,23 @@ secops alert --snapshot-query "feedback_summary.status != \"CLOSED\"" --time-win secops alert --baseline-query "detection.rule_name = \"My Rule\"" --time-window 24 ``` +### Rule Retrohunt Management + +List all retrohunts for a rule +```bash +secops rule-retrohunt list --rule-id "ru_abcdef" +``` + +Create a new retrohunt for a rule +```bash +secops rule-retrohunt create --rule-id "ru_abcdef" --start-time "2026-01-01T00:00:00Z" --end-time "2026-01-02T00:00:00Z" +``` + +Get specific retrohunt details +```bash +secops rule-retrohunt get --rule-id "ru_abcdef" --operation-id "oh_abcdef" +``` + ### Rule Exclusions Management Rule Exclusions allow you to exclude specific events from triggering detections in Chronicle. Use these commands to manage rule exclusions and their deployments: diff --git a/README.md b/README.md index 903728a..26951a1 100644 --- a/README.md +++ b/README.md @@ -2076,7 +2076,10 @@ operation_id = retrohunt.get("name", "").split("/")[-1] # Check retrohunt status retrohunt_status = chronicle.get_retrohunt(rule_id, operation_id) -is_complete = retrohunt_status.get("metadata", {}).get("done", False) +state = retrohunt_status.get("state", "") + +# List retrohunts for a rule +retrohunts = chronicle.list_retrohunts(rule_id) ``` ### Detections and Errors diff --git a/api_module_mapping.md b/api_module_mapping.md index e56f424..1368316 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -40,9 +40,9 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ | rules.list | v1 | chronicle.rule.list_rules | secops rule list | | rules.listRevisions | v1 | | | | rules.patch | v1 | chronicle.rule.update_rule | secops rule update | -| rules.retrohunts.create | v1 | chronicle.rule_retrohunt.create_retrohunt | | -| rules.retrohunts.get | v1 | chronicle.rule_retrohunt.get_retrohunt | | -| rules.retrohunts.list | v1 | | | +| rules.retrohunts.create | v1 | chronicle.rule_retrohunt.create_retrohunt | secops rule-retrohunt create | +| rules.retrohunts.get | v1 | chronicle.rule_retrohunt.get_retrohunt | secops rule-retrohunt get | +| rules.retrohunts.list | v1 | chronicle.rule_retrohunt.list_retrohunts | secops rule-retrohunt list | | rules.updateDeployment | v1 | chronicle.rule.enable_rule | secops rule enable | | watchlists.create | v1 | chronicle.watchlist.create_watchlist | secops watchlist create | | watchlists.delete | v1 | chronicle.watchlist.delete_watchlist | secops watchlist delete | diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 2070547..f38fcf2 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -154,7 +154,11 @@ patch_rule_exclusion, update_rule_exclusion_deployment, ) -from secops.chronicle.rule_retrohunt import create_retrohunt, get_retrohunt +from secops.chronicle.rule_retrohunt import ( + create_retrohunt, + get_retrohunt, + list_retrohunts, +) from secops.chronicle.rule_set import ( batch_update_curated_rule_set_deployments, get_curated_rule, @@ -277,6 +281,7 @@ # Rule retrohunt operations "create_retrohunt", "get_retrohunt", + "list_retrohunts", # Rule set operations "batch_update_curated_rule_set_deployments", "list_curated_rule_sets", diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index b32b6ae..cdf63b2 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -266,8 +266,9 @@ ) from secops.chronicle.rule_retrohunt import ( create_retrohunt as _create_retrohunt, + get_retrohunt as _get_retrohunt, + list_retrohunts as _list_retrohunts, ) -from secops.chronicle.rule_retrohunt import get_retrohunt as _get_retrohunt from secops.chronicle.rule_set import ( batch_update_curated_rule_set_deployments as _batch_update_curated_rule_set_deployments, # pylint: disable=line-too-long ) @@ -2226,6 +2227,39 @@ def get_retrohunt( """ return _get_retrohunt(self, rule_id, operation_id, api_version) + def list_retrohunts( + self, + rule_id: str, + page_size: int | None = None, + page_token: str | None = None, + api_version: APIVersion | None = APIVersion.V1, + as_list: bool = False, + ) -> dict[str, Any] | list[dict[str, Any]]: + """Get a list of retrohunts for a rule. + + Args: + rule_id: Unique ID of the rule to list retrohunts for + page_size: Page size to use for paginated results + page_token: Page token to use for paginated results + api_version: Preferred API version to use. Defaults to V1 + as_list: Whether to return results as a list or dictionary + + Returns: + If as_list is True: List of retrohunts. + If as_list is False: Dict with retrohunts list and nextPageToken. + + Raises: + APIError: If the API request fails + """ + return _list_retrohunts( + self, + rule_id, + page_size, + page_token, + api_version, + as_list, + ) + # Parser Management methods def activate_parser( diff --git a/src/secops/chronicle/rule_retrohunt.py b/src/secops/chronicle/rule_retrohunt.py index 9d85a80..0f37ff2 100644 --- a/src/secops/chronicle/rule_retrohunt.py +++ b/src/secops/chronicle/rule_retrohunt.py @@ -15,14 +15,20 @@ """Retrohunt functionality for Chronicle rules.""" from datetime import datetime -from typing import Any +from typing import Any, TYPE_CHECKING from secops.chronicle.models import APIVersion -from secops.exceptions import APIError +from secops.chronicle.utils.request_utils import ( + chronicle_request, + chronicle_paginated_request, +) + +if TYPE_CHECKING: + from secops.chronicle.client import ChronicleClient def create_retrohunt( - client, + client: "ChronicleClient", rule_id: str, start_time: datetime, end_time: datetime, @@ -46,11 +52,6 @@ def create_retrohunt( Raises: APIError: If the API request fails """ - url = ( - f"{client.base_url(api_version, list(APIVersion))}/" - f"{client.instance_id}/rules/{rule_id}/retrohunts" - ) - body = { "process_interval": { "start_time": start_time.isoformat(), @@ -58,16 +59,18 @@ def create_retrohunt( }, } - response = client.session.post(url, json=body) - - if response.status_code != 200: - raise APIError(f"Failed to create retrohunt: {response.text}") - - return response.json() + return chronicle_request( + client, + method="POST", + endpoint_path=f"rules/{rule_id}/retrohunts", + json=body, + api_version=api_version, + error_message="Failed to create retrohunt", + ) def get_retrohunt( - client, + client: "ChronicleClient", rule_id: str, operation_id: str, api_version: APIVersion | None = APIVersion.V1, @@ -87,14 +90,46 @@ def get_retrohunt( Raises: APIError: If the API request fails """ - url = ( - f"{client.base_url(api_version, list(APIVersion))}/" - f"{client.instance_id}/rules/{rule_id}/retrohunts/{operation_id}" + return chronicle_request( + client, + method="GET", + endpoint_path=f"rules/{rule_id}/retrohunts/{operation_id}", + api_version=api_version, + error_message="Failed to get retrohunt", ) - response = client.session.get(url) - if response.status_code != 200: - raise APIError(f"Failed to get retrohunt: {response.text}") +def list_retrohunts( + client: "ChronicleClient", + rule_id: str, + page_size: int | None = None, + page_token: str | None = None, + api_version: APIVersion | None = APIVersion.V1, + as_list: bool = False, +) -> dict[str, Any] | list[dict[str, Any]]: + """Get a list of retrohunts for a rule. + + Args: + client: ChronicleClient instance + rule_id: Unique ID of the rule to list retrohunts for + page_size: Page size to use for paginated results + page_token: Page token to use for paginated results + api_version: Preferred API version to use. Defaults to V1 + as_list: Whether to return results as a list or dictionary + + Returns: + If as_list is True: List of retrohunts. + If as_list is False: Dict with retrohunts list and nextPageToken. - return response.json() + Raises: + APIError: If the API request fails + """ + return chronicle_paginated_request( + client, + api_version=api_version, + path=f"rules/{rule_id}/retrohunts", + items_key="retrohunts", + page_size=page_size, + page_token=page_token, + as_list=as_list, + ) diff --git a/src/secops/chronicle/rule_validation.py b/src/secops/chronicle/rule_validation.py index 0ebc75b..795b65c 100644 --- a/src/secops/chronicle/rule_validation.py +++ b/src/secops/chronicle/rule_validation.py @@ -14,9 +14,15 @@ # """Rule validation functionality for Chronicle.""" -from typing import NamedTuple +from typing import NamedTuple, TYPE_CHECKING -from secops.exceptions import APIError +from secops.chronicle.models import APIVersion +from secops.chronicle.utils.request_utils import ( + chronicle_request, +) + +if TYPE_CHECKING: + from secops.chronicle.client import ChronicleClient class ValidationResult(NamedTuple): @@ -34,7 +40,9 @@ class ValidationResult(NamedTuple): position: dict[str, int] | None = None -def validate_rule(client, rule_text: str) -> ValidationResult: +def validate_rule( + client: "ChronicleClient", rule_text: str +) -> ValidationResult: """Validates a YARA-L2 rule against the Chronicle API. Args: @@ -51,20 +59,20 @@ def validate_rule(client, rule_text: str) -> ValidationResult: Raises: APIError: If the API request fails """ - url = f"{client.base_url}/{client.instance_id}:verifyRuleText" - # Clean up the rule text by removing leading/trailing backticks and # whitespace cleaned_rule = rule_text.strip("` \n\t\r") body = {"ruleText": cleaned_rule} - response = client.session.post(url, json=body) - - if response.status_code != 200: - raise APIError(f"Failed to validate rule: {response.text}") - - result = response.json() + result = chronicle_request( + client, + method="POST", + endpoint_path=":verifyRuleText", + json=body, + api_version=APIVersion.V1ALPHA, + error_message="Failed to validate rule", + ) # Check if the response indicates success if result.get("success", False): diff --git a/src/secops/chronicle/stats.py b/src/secops/chronicle/stats.py index 8510ef3..99b4630 100644 --- a/src/secops/chronicle/stats.py +++ b/src/secops/chronicle/stats.py @@ -78,6 +78,7 @@ def get_stats( api_version=APIVersion.V1ALPHA, params=params, timeout=timeout, + error_message="Failed to get stats", ) if "stats" not in results: diff --git a/src/secops/cli/cli_client.py b/src/secops/cli/cli_client.py index b397e0d..4c48365 100644 --- a/src/secops/cli/cli_client.py +++ b/src/secops/cli/cli_client.py @@ -38,6 +38,7 @@ from secops.cli.commands.stats import setup_stats_command from secops.cli.commands.udm_search import setup_udm_search_view_command from secops.cli.commands.watchlist import setup_watchlist_command +from secops.cli.commands.rule_retrohunt import setup_rule_retrohunt_command from secops.cli.utils.common_args import add_chronicle_args, add_common_args from secops.cli.utils.config_utils import load_config from secops.exceptions import AuthenticationError, SecOpsError @@ -187,6 +188,7 @@ def build_parser() -> argparse.ArgumentParser: setup_dashboard_command(subparsers) setup_dashboard_query_command(subparsers) setup_watchlist_command(subparsers) + setup_rule_retrohunt_command(subparsers) return parser diff --git a/src/secops/cli/commands/rule_retrohunt.py b/src/secops/cli/commands/rule_retrohunt.py new file mode 100644 index 0000000..a83cca8 --- /dev/null +++ b/src/secops/cli/commands/rule_retrohunt.py @@ -0,0 +1,122 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Google SecOps CLI rule retrohunt commands""" + +import sys + +from secops.cli.utils.formatters import output_formatter +from secops.cli.utils.common_args import ( + add_time_range_args, + add_pagination_args, + add_as_list_arg, +) +from secops.cli.utils.time_utils import parse_datetime + + +def setup_rule_retrohunt_command(subparsers): + """Setup rule retrohunt command""" + retrohunt_parser = subparsers.add_parser( + "rule-retrohunt", + help="Rule Retrohunt commands", + description="Rule Retrohunt commands", + ) + lvl1 = retrohunt_parser.add_subparsers( + dest="command", help="Rule Retrohunt command" + ) + + # list command + list_parser = lvl1.add_parser("list", help="List rule retrohunts") + add_pagination_args(list_parser) + add_as_list_arg(list_parser) + list_parser.add_argument( + "--rule-id", + type=str, + help="ID of the rule to get retrohunts for", + dest="rule_id", + required=True, + ) + list_parser.set_defaults(func=handle_retrohunt_list_command) + + # create command + create_parser = lvl1.add_parser("create", help="create rule retrohunt") + add_time_range_args(create_parser, required=True) + create_parser.add_argument( + "--rule-id", + type=str, + help="ID of the rule to create a retrohunt for", + dest="rule_id", + required=True, + ) + create_parser.set_defaults(func=handle_rule_retrohunt_create_command) + + # get command + get_parser = lvl1.add_parser("get", help="get a rule retrohunt") + get_parser.add_argument( + "--rule-id", + type=str, + help="ID of the rule to get retrohunt for", + dest="rule_id", + required=True, + ) + get_parser.add_argument( + "--operation-id", + type=str, + help="Operation ID of the retrohunt", + dest="operation_id", + required=True, + ) + get_parser.set_defaults(func=handle_get_retrohunt_command) + + +def handle_retrohunt_list_command(args, chronicle): + """List rule retrohunts""" + try: + out = chronicle.list_retrohunts( + rule_id=args.rule_id, + page_size=args.page_size, + page_token=args.page_token, + as_list=args.as_list, + ) + output_formatter(out, getattr(args, "output", "json")) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error listing rule retrohunts: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_rule_retrohunt_create_command(args, chronicle): + """Create rule retrohunt""" + try: + out = chronicle.create_retrohunt( + rule_id=args.rule_id, + start_time=parse_datetime(args.start_time), + end_time=parse_datetime(args.end_time), + ) + output_formatter(out, getattr(args, "output", "json")) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error creating rule retrohunt: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_get_retrohunt_command(args, chronicle): + """Get a rule retrohunt""" + try: + out = chronicle.get_retrohunt( + rule_id=args.rule_id, + operation_id=args.operation_id, + ) + output_formatter(out, getattr(args, "output", "json")) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error getting rule retrohunt: {e}", file=sys.stderr) + sys.exit(1) diff --git a/src/secops/cli/utils/common_args.py b/src/secops/cli/utils/common_args.py index 5c09f84..7490801 100644 --- a/src/secops/cli/utils/common_args.py +++ b/src/secops/cli/utils/common_args.py @@ -81,35 +81,46 @@ def add_chronicle_args(parser: argparse.ArgumentParser) -> None: ) -def add_time_range_args(parser: argparse.ArgumentParser) -> None: +def add_time_range_args( + parser: argparse.ArgumentParser, required: bool = False +) -> None: """Add time range arguments to a parser. Args: - parser: Parser to add arguments to + parser: Parser to add arguments to. + required: Whether a time range is required. """ config = load_config() + time_window_default = None if required else config.get("time_window", 24) - parser.add_argument( + group = parser.add_mutually_exclusive_group(required=required) + + group.add_argument( "--start-time", "--start_time", dest="start_time", default=config.get("start_time"), - help="Start time in ISO format (YYYY-MM-DDTHH:MM:SSZ)", + help="Start time in ISO format " + "(YYYY-MM-DDTHH:MM:SSZ). " + "Must be used with --end-time", + ) + group.add_argument( + "--time-window", + "--time_window", + dest="time_window", + type=int, + default=time_window_default, + help="Time window in hours " "(alternative to start/end time)", ) + parser.add_argument( "--end-time", "--end_time", dest="end_time", default=config.get("end_time"), - help="End time in ISO format (YYYY-MM-DDTHH:MM:SSZ)", - ) - parser.add_argument( - "--time-window", - "--time_window", - dest="time_window", - type=int, - default=config.get("time_window", 24), - help="Time window in hours (alternative to start/end time)", + help="End time in ISO format " + "(YYYY-MM-DDTHH:MM:SSZ). " + "Used with --start-time", ) diff --git a/tests/chronicle/test_rule_retrohunt.py b/tests/chronicle/test_rule_retrohunt.py new file mode 100644 index 0000000..e73e45d --- /dev/null +++ b/tests/chronicle/test_rule_retrohunt.py @@ -0,0 +1,195 @@ +"""Tests for Chronicle retrohunt functionality.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from unittest.mock import Mock, patch + +import pytest + +from secops.chronicle.models import APIVersion +from secops.chronicle.rule_retrohunt import ( + create_retrohunt, + get_retrohunt, + list_retrohunts, +) + + +@pytest.fixture +def client() -> Mock: + return Mock() + + +def test_create_retrohunt_calls_chronicle_request(client: Mock) -> None: + start = datetime(2024, 1, 1, tzinfo=timezone.utc) + end = datetime(2024, 1, 2, tzinfo=timezone.utc) + + expected = {"name": "operations/op_123"} + + with patch( + "secops.chronicle.rule_retrohunt.chronicle_request", + return_value=expected, + ) as req: + result = create_retrohunt( + client=client, + rule_id="ru_abc123", + start_time=start, + end_time=end, + ) + + assert result == expected + + req.assert_called_once() + _, kwargs = req.call_args + + assert kwargs["method"] == "POST" + assert kwargs["endpoint_path"] == "rules/ru_abc123/retrohunts" + assert kwargs["api_version"] == APIVersion.V1 + + body = kwargs["json"] + assert body["process_interval"]["start_time"] == start.isoformat() + assert body["process_interval"]["end_time"] == end.isoformat() + + +def test_get_retrohunt_calls_chronicle_request(client: Mock) -> None: + expected = { + "name": "operations/op_123", + "done": False, + } + + with patch( + "secops.chronicle.rule_retrohunt.chronicle_request", + return_value=expected, + ) as req: + result = get_retrohunt( + client=client, + rule_id="ru_abc123", + operation_id="op_123", + ) + + assert result == expected + + req.assert_called_once() + _, kwargs = req.call_args + + assert kwargs["method"] == "GET" + assert kwargs["endpoint_path"] == "rules/ru_abc123/retrohunts/op_123" + assert kwargs["api_version"] == APIVersion.V1 + + +def test_list_retrohunts_dict_response(client: Mock) -> None: + expected = { + "retrohunts": [{"name": "rh1"}, {"name": "rh2"}], + "nextPageToken": "token123", + } + + with patch( + "secops.chronicle.rule_retrohunt.chronicle_paginated_request", + return_value=expected, + ) as paged: + result = list_retrohunts( + client=client, + rule_id="ru_abc123", + page_size=50, + page_token="token123", + ) + + assert result == expected + + paged.assert_called_once() + _, kwargs = paged.call_args + + assert kwargs["api_version"] == APIVersion.V1 + assert kwargs["path"] == "rules/ru_abc123/retrohunts" + assert kwargs["items_key"] == "retrohunts" + assert kwargs["page_size"] == 50 + assert kwargs["page_token"] == "token123" + assert kwargs["as_list"] is False + + +def test_list_retrohunts_list_response(client: Mock) -> None: + expected = [{"name": "rh1"}, {"name": "rh2"}] + + with patch( + "secops.chronicle.rule_retrohunt.chronicle_paginated_request", + return_value=expected, + ) as paged: + result = list_retrohunts( + client=client, + rule_id="ru_abc123", + as_list=True, + ) + + assert result == expected + + paged.assert_called_once() + _, kwargs = paged.call_args + + assert kwargs["path"] == "rules/ru_abc123/retrohunts" + assert kwargs["items_key"] == "retrohunts" + assert kwargs["as_list"] is True + + +def test_list_retrohunts_propagates_api_error(client: Mock) -> None: + with patch( + "secops.chronicle.rule_retrohunt.chronicle_paginated_request", + side_effect=Exception("boom"), + ): + with pytest.raises(Exception, match="boom"): + list_retrohunts(client=client, rule_id="ru_abc123") + + +def test_create_retrohunt_propagates_api_error(client: Mock) -> None: + start = datetime(2024, 1, 1, tzinfo=timezone.utc) + end = datetime(2024, 1, 2, tzinfo=timezone.utc) + + with patch( + "secops.chronicle.rule_retrohunt.chronicle_request", + side_effect=Exception("API error"), + ): + with pytest.raises(Exception, match="API error"): + create_retrohunt( + client=client, + rule_id="ru_abc123", + start_time=start, + end_time=end, + ) + + +def test_get_retrohunt_propagates_api_error(client: Mock) -> None: + with patch( + "secops.chronicle.rule_retrohunt.chronicle_request", + side_effect=Exception("Not found"), + ): + with pytest.raises(Exception, match="Not found"): + get_retrohunt( + client=client, + rule_id="ru_abc123", + operation_id="op_123", + ) + + +def test_create_retrohunt_with_invalid_time_range( + client: Mock, +) -> None: + start = datetime(2024, 1, 2, tzinfo=timezone.utc) + end = datetime(2024, 1, 1, tzinfo=timezone.utc) + + expected = {"name": "operations/op_123"} + + with patch( + "secops.chronicle.rule_retrohunt.chronicle_request", + return_value=expected, + ) as req: + result = create_retrohunt( + client=client, + rule_id="ru_abc123", + start_time=start, + end_time=end, + ) + + assert result == expected + req.assert_called_once() + body = req.call_args[1]["json"] + assert body["process_interval"]["start_time"] == start.isoformat() + assert body["process_interval"]["end_time"] == end.isoformat() diff --git a/tests/chronicle/test_rule_validation.py b/tests/chronicle/test_rule_validation.py index 1e247a3..3840680 100644 --- a/tests/chronicle/test_rule_validation.py +++ b/tests/chronicle/test_rule_validation.py @@ -1,80 +1,26 @@ """Tests for Chronicle rule validation functions.""" -import pytest -from unittest.mock import Mock, patch -from secops.chronicle.client import ChronicleClient -from secops.chronicle.rule_validation import validate_rule -from secops.exceptions import APIError - - -@pytest.fixture -def chronicle_client(): - """Create a Chronicle client for testing.""" - with patch("secops.auth.SecOpsAuth") as mock_auth: - mock_session = Mock() - mock_session.headers = {} - mock_auth.return_value.session = mock_session - return ChronicleClient( - customer_id="test-customer", project_id="test-project" - ) - +from __future__ import annotations -@pytest.fixture -def mock_success_response(): - """Create a mock successful API response.""" - mock = Mock() - mock.status_code = 200 - mock.json.return_value = {"success": True} - return mock +from unittest.mock import Mock, patch +import pytest -@pytest.fixture -def mock_error_response(): - """Create a mock error API response.""" - mock = Mock() - mock.status_code = 200 - mock.json.return_value = { - "compilationDiagnostics": [ - { - "message": "semantic analysis: event variable e and its child variables not used in condition section", - "severity": "ERROR", - } - ] - } - return mock +from secops.chronicle.models import APIVersion +from secops.chronicle.rule_validation import ValidationResult, validate_rule +from secops.exceptions import APIError @pytest.fixture -def mock_error_with_position(): - """Create a mock error API response with position information.""" - mock = Mock() - mock.status_code = 200 - mock.json.return_value = { - "compilationDiagnostics": [ - { - "message": 'parsing: error with token: "+"\nexpected }\nline: 27 \ncolumn: 8-9 ', - "position": { - "startLine": 27, - "startColumn": 8, - "endLine": 27, - "endColumn": 9, - }, - "severity": "ERROR", - } - ] - } - return mock +def client() -> Mock: + return Mock() -def test_validate_rule_success(chronicle_client, mock_success_response): - """Test validate_rule function with successful validation.""" - # Arrange +def test_validate_rule_success(client: Mock) -> None: rule_text = """ rule test_rule { meta: author = "test" - description = "test rule" - severity = "Low" events: $e.metadata.event_type = "NETWORK_CONNECTION" condition: @@ -82,71 +28,95 @@ def test_validate_rule_success(chronicle_client, mock_success_response): } """ - with patch.object( - chronicle_client.session, "post", return_value=mock_success_response - ) as mock_post: - # Act - result = validate_rule(chronicle_client, rule_text) - - # Assert - mock_post.assert_called_once() - assert result.success is True - assert result.message is None - assert result.position is None - - -def test_validate_rule_error(chronicle_client, mock_error_response): - """Test validate_rule function with validation error.""" - # Arrange - rule_text = "invalid rule" - - with patch.object( - chronicle_client.session, "post", return_value=mock_error_response - ) as mock_post: - # Act - result = validate_rule(chronicle_client, rule_text) - - # Assert - mock_post.assert_called_once() - assert result.success is False - assert "semantic analysis" in result.message - assert result.position is None - - -def test_validate_rule_error_with_position(chronicle_client, mock_error_with_position): - """Test validate_rule function with validation error including position information.""" - # Arrange - rule_text = "invalid rule with position" - - with patch.object( - chronicle_client.session, "post", return_value=mock_error_with_position - ) as mock_post: - # Act - result = validate_rule(chronicle_client, rule_text) - - # Assert - mock_post.assert_called_once() - assert result.success is False - assert "parsing: error with token" in result.message - assert result.position is not None - assert result.position["startLine"] == 27 - assert result.position["startColumn"] == 8 - assert result.position["endLine"] == 27 - assert result.position["endColumn"] == 9 - - -def test_validate_rule_api_error(chronicle_client): - """Test validate_rule function with API error.""" - # Arrange - mock_error = Mock() - mock_error.status_code = 400 - mock_error.text = "API Error" - - with patch.object( - chronicle_client.session, "post", return_value=mock_error - ) as mock_post: - # Act & Assert - with pytest.raises(APIError) as exc_info: - validate_rule(chronicle_client, "rule text") - - assert "Failed to validate rule" in str(exc_info.value) + with patch( + "secops.chronicle.rule_validation.chronicle_request", + return_value={"success": True}, + ) as req: + result = validate_rule(client, rule_text) + + assert result == ValidationResult(success=True, message=None, position=None) + + req.assert_called_once() + _, kwargs = req.call_args + + assert kwargs["method"] == "POST" + assert kwargs["endpoint_path"] == ":verifyRuleText" + assert kwargs["api_version"] == APIVersion.V1ALPHA + + # Verify rule text cleanup happened (strip backticks/whitespace) + body = kwargs["json"] + assert "rule test_rule" in body["ruleText"] + assert not body["ruleText"].startswith("`") + assert not body["ruleText"].endswith("`") + + +def test_validate_rule_error_without_position(client: Mock) -> None: + with patch( + "secops.chronicle.rule_validation.chronicle_request", + return_value={ + "success": False, + "compilationDiagnostics": [ + { + "message": "semantic analysis: something went wrong", + "severity": "ERROR", + } + ], + }, + ): + result = validate_rule(client, "invalid rule") + + assert result.success is False + assert "semantic analysis" in (result.message or "") + assert result.position is None + + +def test_validate_rule_error_with_position(client: Mock) -> None: + with patch( + "secops.chronicle.rule_validation.chronicle_request", + return_value={ + "success": False, + "compilationDiagnostics": [ + { + "message": 'parsing: error with token: "+"', + "position": { + "startLine": 27, + "startColumn": 8, + "endLine": 27, + "endColumn": 9, + }, + "severity": "ERROR", + } + ], + }, + ): + result = validate_rule(client, "invalid rule with position") + + assert result.success is False + assert "parsing: error with token" in (result.message or "") + assert result.position == { + "startLine": 27, + "startColumn": 8, + "endLine": 27, + "endColumn": 9, + } + + +def test_validate_rule_unknown_error_when_no_diagnostics(client: Mock) -> None: + with patch( + "secops.chronicle.rule_validation.chronicle_request", + return_value={"success": False, "compilationDiagnostics": []}, + ): + result = validate_rule(client, "invalid rule") + + assert result.success is False + assert result.message == "Unknown validation error" + assert result.position is None + + +def test_validate_rule_propagates_api_error(client: Mock) -> None: + with patch( + "secops.chronicle.rule_validation.chronicle_request", + side_effect=APIError("API request failed: boom"), + ): + with pytest.raises(APIError, match=r"API request failed: boom"): + validate_rule(client, "rule text")