diff --git a/CLI.md b/CLI.md index eeb21a0..2555a5c 100644 --- a/CLI.md +++ b/CLI.md @@ -996,6 +996,8 @@ secops rule-exclusion compute-activity \ ### Case Management +Chronicle also provides comprehensive case management capabilities for tracking and managing security investigations. The CLI supports listing, retrieving, updating, and performing bulk operations on cases. + Get case details for specific case IDs: ```bash @@ -1013,7 +1015,74 @@ secops alert --time-window 24 --max-alerts 50 > alerts.json secops case --ids "case-123,case-456" ``` -> **Note**: The case management uses a batch API that can retrieve multiple cases in a single request. You can provide up to 1000 case IDs separated by commas. +> **Note**: You can provide up to 1000 case IDs separated by commas. + +#### List cases + +```bash +# List all cases with default pagination +secops case list --page-size 50 + +# List with filtering +secops case list --page-size 100 --filter 'status = "OPENED"' --order-by "createTime desc" + +# Get cases as a flat list instead of paginated dict +secops case list --page-size 50 --as-list +``` + +#### Get case details + +```bash +# Get a specific case by ID +secops case get --id "12345" + +# Get case with expanded fields +secops case get --id "12345" --expand "tags,products" + +# Legacy: Get multiple cases by IDs (batch API) +secops case --ids "case-123,case-456" +``` + +> **Note**: The legacy batch API can retrieve up to 1000 case IDs in a single request. + +#### Update a case + +```bash +# Update case priority +secops case update --id "12345" --data '{"priority": "PRIORITY_HIGH"}' --update-mask "priority" + +# Update multiple fields +secops case update --id "12345" --data '{"priority": "PRIORITY_MEDIUM", "stage": "Investigation"}' --update-mask "priority,stage" +``` + +#### Merge cases + +```bash +# Merge source cases into target case +secops case merge --source-ids "12345,67890" --target-id "11111" +``` + +#### Bulk operations + +```bash +# Bulk add tags to cases +secops case bulk-add-tag --ids "12345,67890" --tags "phishing,high-priority" + +# Bulk assign cases to a user +secops case bulk-assign --ids "12345,67890" --username "@SecurityTeam" + +# Bulk change priority +secops case bulk-change-priority --ids "12345,67890" --priority "HIGH" + +# Bulk change stage +secops case bulk-change-stage --ids "12345,67890" --stage "Remediation" + +# Bulk close cases +secops case bulk-close --ids "12345,67890" --close-reason "NOT_MALICIOUS" --root-cause "False positive - benign activity" + +# Bulk reopen cases +secops case bulk-reopen --ids "12345,67890" --reopen-comment "New evidence discovered" +``` ### Investigation Management diff --git a/README.md b/README.md index 903728a..4452e26 100644 --- a/README.md +++ b/README.md @@ -1403,6 +1403,134 @@ case = cases.get_case("case-id-1") > **Note**: The case management API uses the `legacy:legacyBatchGetCases` endpoint to retrieve multiple cases in a single request. You can retrieve up to 1000 cases in a single batch. +### Case Management + +Chronicle provides comprehensive case management capabilities for tracking and managing security investigations. The SDK supports listing, retrieving, updating, and performing bulk operations on cases. + +#### List cases + +Retrieve cases with optional filtering and pagination: + +```python +# List all cases with default pagination +result = chronicle.list_cases(page_size=50) +for case_data in result["cases"]: + case_id = case_data["name"].split("/")[-1] + print(f"Case {case_id}: {case_data['displayName']}") + +# List with filtering +open_cases = chronicle.list_cases( + page_size=100, + filter_query='status = "OPENED"', + order_by="createTime desc" +) + +# Get cases as a flat list instead of paginated dict +cases_list = chronicle.list_cases(page_size=50, as_list=True) +for case in cases_list: + print(f"{case['displayName']}: {case['priority']}") +``` + +#### Get case details + +Retrieve detailed information about a specific case: + +```python +# Get case by ID +case = chronicle.get_case("12345") +print(f"Case: {case.display_name}") +print(f"Priority: {case.priority}") +print(f"Status: {case.status}") +print(f"Stage: {case.stage}") + +# Get case with expanded fields +case_expanded = chronicle.get_case("12345", expand="tags,products") +``` + +#### Update a case + +Update case fields using partial updates: + +```python +# Update case priority +updated_case = chronicle.patch_case( + case_name="12345", + case_data={"priority": "PRIORITY_HIGH"}, + update_mask="priority" +) + +# Update multiple fields +updated_case = chronicle.patch_case( + case_name="12345", + case_data={ + "priority": "PRIORITY_MEDIUM", + "stage": "Investigation" + }, + update_mask="priority,stage" +) +``` + +#### Merge cases + +Merge multiple cases into a single target case: + +```python +# Merge source cases into target case +result = chronicle.merge_cases( + case_ids=[12345, 67890], + case_to_merge_with=11111 +) + +if result.get("isRequestValid"): + print(f"Cases merged into case {result['newCaseId']}") +else: + print(f"Merge failed: {result.get('errors')}") +``` + +#### Bulk operations + +Perform operations on multiple cases simultaneously: + +```python +# Bulk add tags +chronicle.execute_bulk_add_tag( + case_ids=[12345, 67890], + tags=["phishing", "high-priority"] +) + +# Bulk assign cases +chronicle.execute_bulk_assign( + case_ids=[12345, 67890], + username="@SecurityTeam" +) + +# Bulk change priority +chronicle.execute_bulk_change_priority( + case_ids=[12345, 67890], + priority="PRIORITY_HIGH" +) + +# Bulk change stage +chronicle.execute_bulk_change_stage( + case_ids=[12345, 67890], + stage="Remediation" +) + +# Bulk close cases +chronicle.execute_bulk_close( + case_ids=[12345, 67890], + close_reason="NOT_MALICIOUS", + root_cause="False positive - benign activity", + close_comment="Verified with asset owner" +) + +# Bulk reopen cases +chronicle.execute_bulk_reopen( + case_ids=[12345, 67890], + reopen_comment="New evidence discovered" +) +``` + ### Investigation Management Chronicle investigations provide automated analysis and recommendations for alerts and cases. The SDK provides methods to list, retrieve, trigger, and fetch associated investigations. diff --git a/api_module_mapping.md b/api_module_mapping.md index e56f424..f40b638 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -7,6 +7,7 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ ## Implementation Statistics - **v1:** 17 endpoints implemented +- **v1beta:** 10 endpoints implemented - **v1alpha:** 113 endpoints implemented ## Endpoint Mapping @@ -85,6 +86,16 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ | watchlists.get | v1beta | | | | watchlists.list | v1beta | | | | watchlists.patch | v1beta | | | +| cases.executeBulkAddTag | v1beta | chronicle.case.execute_bulk_add_tag | secops case bulk-add-tag | +| cases.executeBulkAssign | v1beta | chronicle.case.execute_bulk_assign | secops case bulk-assign | +| cases.executeBulkChangePriority | v1beta | chronicle.case.execute_bulk_change_priority | secops case bulk-change-priority | +| cases.executeBulkChangeStage | v1beta | chronicle.case.execute_bulk_change_stage | secops case bulk-change-stage | +| cases.executeBulkClose | v1beta | chronicle.case.execute_bulk_close | secops case bulk-close | +| cases.executeBulkReopen | v1beta | chronicle.case.execute_bulk_reopen | secops case bulk-reopen | +| cases.get | v1beta | chronicle.case.get_case | secops case get | +| cases.list | v1beta | chronicle.case.list_cases | secops case list | +| cases.merge | v1beta | chronicle.case.merge_cases | secops case merge | +| cases.patch | v1beta | chronicle.case.patch_case | secops case update | | analytics.entities.analyticValues.list | v1alpha | | | | analytics.list | v1alpha | | | | batchValidateWatchlistEntities | v1alpha | | | diff --git a/examples/case_management_example.py b/examples/case_management_example.py new file mode 100644 index 0000000..48fef2d --- /dev/null +++ b/examples/case_management_example.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Example demonstrating case management functionality with Chronicle.""" + +import argparse + +from secops import SecOpsClient +from secops.chronicle import CasePriority +from secops.exceptions import APIError + + +def list_cases_example(chronicle): + """Demonstrate listing cases with filtering and pagination. + + Args: + chronicle: Initialized Chronicle client + """ + print("\n=== Example 1: List Cases ===") + + try: + # List cases with basic pagination + result = chronicle.list_cases( + page_size=10, filter_query='priority="PRIORITY_HIGH"' + ) + + print(f"Total cases: {result['totalSize']}") + print(f"Cases in this page: {len(result['cases'])}") + + # Display first few cases + for i, case in enumerate(result["cases"][:3], 1): + print(f"\nCase {i}:") + print(f" ID: {case['name']}") + print(f" Display Name: {case['displayName']}") + print(f" Priority: {case['priority']}") + print(f" Stage: {case['stage']}") + print(f" Status: {case['status']}") + + # Check if there are more pages + if result["nextPageToken"]: + print(f"\nMore cases available (next page token exists)") + + except APIError as e: + print(f"Error listing cases: {e}") + + +def get_case_example(chronicle, case_id): + """Demonstrate getting a single case by ID. + + Args: + chronicle: Initialized Chronicle client + case_id: Case ID to retrieve + """ + print("\n=== Example 2: Get Single Case ===") + + try: + # Method 1: Using just the case ID (recommended - simpler) + print("\nUsing just case ID:") + case = chronicle.get_case(case_id, expand="tags,products") + + print(f"Case ID: {case.id}") + print(f"Display Name: {case.display_name}") + print(f"Priority: {case.priority}") + print(f"Stage: {case.stage}") + print(f"Status: {case.status}") + + if case.soar_platform_info: + print(f"SOAR Platform: " f"{case.soar_platform_info.platform_type}") + print(f"SOAR Case ID: {case.soar_platform_info.case_id}") + + except APIError as e: + print(f"Error retrieving case: {e}") + + +def patch_case_example(chronicle, case_id): + """Demonstrate updating a case using PATCH. + + Shows usage with just case ID for simplicity. + + Args: + chronicle: Initialized Chronicle client + case_id: Case ID to update + """ + print("\n=== Example 3: Update Case (PATCH) ===") + + try: + # Update specific fields using just the case ID + # Note: Priority values are automatically normalized to PRIORITY_ format + # You can use either "MEDIUM" or "PRIORITY_MEDIUM" + case_data = { + "priority": "MEDIUM", + "displayName": "Updated Case Name", + } + + updated_case = chronicle.patch_case( + case_id, case_data, update_mask="priority,displayName" + ) + + print(f"Case updated successfully!") + print(f"New Priority: {updated_case.priority}") + print(f"New Display Name: {updated_case.display_name}") + + except APIError as e: + print(f"Error updating case: {e}") + + +def bulk_add_tags_example(chronicle, case_ids): + """Demonstrate adding tags to multiple cases. + + Args: + chronicle: Initialized Chronicle client + case_ids: List of case IDs to add tags to + """ + print("\n=== Example 4: Bulk Add Tags ===") + + try: + tags = ["security-review", "high-priority"] + + result = chronicle.execute_bulk_add_tag(case_ids, tags) + + print(f"Successfully added tags {tags} to {len(case_ids)} cases") + print(f"Result: {result}") + + except APIError as e: + print(f"Error adding tags: {e}") + + +def bulk_assign_example(chronicle, case_ids, username): + """Demonstrate assigning multiple cases to a user. + + Args: + chronicle: Initialized Chronicle client + case_ids: List of case IDs to assign + username: Username to assign cases to + """ + print("\n=== Example 5: Bulk Assign Cases ===") + + try: + result = chronicle.execute_bulk_assign(case_ids, username) + + print(f"Successfully assigned {len(case_ids)} cases to {username}") + print(f"Result: {result}") + + except APIError as e: + print(f"Error assigning cases: {e}") + + +def bulk_change_priority_example(chronicle, case_ids): + """Demonstrate changing priority of multiple cases. + + Shows both string and enum usage. + + Args: + chronicle: Initialized Chronicle client + case_ids: List of case IDs to update + """ + print("\n=== Example 6: Bulk Change Priority ===") + + try: + # Example using enum (recommended) + print("\nUsing CasePriority enum:") + result = chronicle.execute_bulk_change_priority( + case_ids, CasePriority.HIGH + ) + + print( + f"Successfully changed priority to HIGH for " + f"{len(case_ids)} cases" + ) + print(f"Result: {result}") + + # Example using string (also supported) + print("\nUsing string value:") + result = chronicle.execute_bulk_change_priority( + case_ids, "PRIORITY_MEDIUM" + ) + + print( + f"Successfully changed priority to MEDIUM for " + f"{len(case_ids)} cases" + ) + print(f"Result: {result}") + + except APIError as e: + print(f"Error changing priority: {e}") + + +def bulk_change_stage_example(chronicle, case_ids): + """Demonstrate changing stage of multiple cases. + + Args: + chronicle: Initialized Chronicle client + case_ids: List of case IDs to update + """ + print("\n=== Example 7: Bulk Change Stage ===") + + try: + result = chronicle.execute_bulk_change_stage(case_ids, "Investigation") + + print( + f"Successfully changed stage to Investigation for " + f"{len(case_ids)} cases" + ) + print(f"Result: {result}") + + except APIError as e: + print(f"Error changing stage: {e}") + + +def bulk_close_example(chronicle, case_ids): + """Demonstrate closing multiple cases. + + Args: + chronicle: Initialized Chronicle client + case_ids: List of case IDs to close + """ + print("\n=== Example 8: Bulk Close Cases ===") + + try: + # Valid close_reason values: MALICIOUS, NOT_MALICIOUS, MAINTENANCE, + # INCONCLUSIVE, UNKNOWN, or CLOSE_REASON_UNSPECIFIED + result = chronicle.execute_bulk_close( + case_ids=case_ids, + close_reason="NOT_MALICIOUS", + root_cause="No threat detected", + close_comment="Closed after thorough investigation", + ) + + print(f"Successfully closed {len(case_ids)} cases") + print(f"Result: {result}") + + except APIError as e: + print(f"Error closing cases: {e}") + + +def bulk_reopen_example(chronicle, case_ids): + """Demonstrate reopening multiple cases. + + Args: + chronicle: Initialized Chronicle client + case_ids: List of case IDs to reopen + """ + print("\n=== Example 9: Bulk Reopen Cases ===") + + try: + result = chronicle.execute_bulk_reopen( + case_ids, "Reopening for additional investigation" + ) + + print(f"Successfully reopened {len(case_ids)} cases") + print(f"Result: {result}") + + except APIError as e: + print(f"Error reopening cases: {e}") + + +def merge_cases_example(chronicle, case_ids, target_case_id): + """Demonstrate merging multiple cases into one. + + Args: + chronicle: Initialized Chronicle client + case_ids: List of case IDs to merge + target_case_id: ID of the case to merge into + """ + print("\n=== Example 10: Merge Cases ===") + + try: + result = chronicle.merge_cases(case_ids, target_case_id) + + if result.get("isRequestValid"): + print(f"Successfully merged cases into case {target_case_id}") + print(f"New Case ID: {result.get('newCaseId')}") + else: + print(f"Merge request invalid") + print(f"Errors: {result.get('errors', [])}") + + except APIError as e: + print(f"Error merging cases: {e}") + + +def parse_case_ids(value): + """Parse comma-separated case IDs into list of integers.""" + try: + return [int(case_id.strip()) for case_id in value.split(",")] + except ValueError as e: + raise argparse.ArgumentTypeError( + f"Invalid case ID format: {value}" + ) from e + + +def main(): + """Run the example.""" + parser = argparse.ArgumentParser( + description="Example of case management with Chronicle" + ) + parser.add_argument( + "--customer_id", required=True, help="Chronicle instance ID" + ) + parser.add_argument("--project_id", required=True, help="GCP project ID") + parser.add_argument("--region", default="us", help="Chronicle API region") + parser.add_argument( + "--case_id", + help="Specific case ID for single case operations", + ) + parser.add_argument( + "--case_ids", + type=parse_case_ids, + help="Comma-separated list of case IDs for bulk operations " + "(e.g., 123,456,789)", + ) + parser.add_argument( + "--username", + help="Username for case assignment operations", + ) + + args = parser.parse_args() + + # Initialize the client + client = SecOpsClient() + + # Configure Chronicle client + chronicle = client.chronicle( + customer_id=args.customer_id, + project_id=args.project_id, + region=args.region, + ) + + # Run examples + print("=" * 60) + print("Chronicle Case Management Examples") + print("=" * 60) + + # Example 1: List cases + list_cases_example(chronicle) + + # Example 2: Get a single case (if case_id provided) + if args.case_id: + get_case_example(chronicle, args.case_id) + + # Example 3: Update a case (if case_id provided) + if args.case_id: + patch_case_example(chronicle, args.case_id) + + # Bulk operations (if case_ids provided) + if args.case_ids: + print(f"\nRunning bulk operations on {len(args.case_ids)} " f"cases") + + # Example 4: Add tags + bulk_add_tags_example(chronicle, args.case_ids) + + # Example 5: Assign cases (if username provided) + if args.username: + bulk_assign_example(chronicle, args.case_ids, args.username) + + # Example 6: Change priority + bulk_change_priority_example(chronicle, args.case_ids) + + # Example 7: Change stage + bulk_change_stage_example(chronicle, args.case_ids) + + # Example 8: Close cases + bulk_close_example(chronicle, args.case_ids) + + # Example 9: Reopen cases + bulk_reopen_example(chronicle, args.case_ids) + + # Example 10: Merge cases (use first as target) + if len(args.case_ids) > 1: + target = args.case_ids[0] + to_merge = args.case_ids[1:] + merge_cases_example(chronicle, to_merge, target) + + print("\n" + "=" * 60) + print("Examples completed!") + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 2070547..8dc8aa6 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -15,7 +15,20 @@ """Chronicle API specific functionality.""" from secops.chronicle.alert import get_alerts -from secops.chronicle.case import get_cases +from secops.chronicle.case import ( + execute_bulk_add_tag, + execute_bulk_assign, + execute_bulk_change_priority, + execute_bulk_change_stage, + execute_bulk_close, + execute_bulk_reopen, + get_case, + get_cases, + list_cases, + merge_cases, + patch_case, +) +from secops.chronicle.models import CaseCloseReason, CasePriority from secops.chronicle.client import ( ChronicleClient, ValueType, @@ -220,6 +233,18 @@ "trigger_investigation", # Case "get_cases", + "get_case", + "list_cases", + "patch_case", + "merge_cases", + "execute_bulk_add_tag", + "execute_bulk_assign", + "execute_bulk_change_priority", + "execute_bulk_change_stage", + "execute_bulk_close", + "execute_bulk_reopen", + "CasePriority", + "CaseCloseReason", # Alert "get_alerts", # Log Ingestion diff --git a/src/secops/chronicle/case.py b/src/secops/chronicle/case.py index 81994a6..eb1ff89 100644 --- a/src/secops/chronicle/case.py +++ b/src/secops/chronicle/case.py @@ -17,8 +17,17 @@ from datetime import datetime from typing import Any -from secops.chronicle.models import Case, CaseList -from secops.exceptions import APIError +from secops.chronicle.models import ( + APIVersion, + Case, + CaseCloseReason, + CaseList, + CasePriority, +) +from secops.chronicle.utils.request_utils import ( + chronicle_paginated_request, + chronicle_request, +) def get_cases( @@ -49,11 +58,8 @@ def get_cases( Raises: APIError: If the API request fails """ - url = f"{client.base_url}/{client.instance_id}/legacy:legacyListCases" + params: dict[str, Any] = {"pageSize": str(page_size)} - params = {"pageSize": str(page_size)} - - # Add optional parameters if page_token: params["pageToken"] = page_token @@ -78,20 +84,14 @@ def get_cases( if tenant_id: params["tenantId"] = tenant_id - response = client.session.get(url, params=params) - - if response.status_code != 200: - raise APIError(f"Failed to retrieve cases: {response.text}") - - try: - data = response.json() - - return { - "cases": data.get("cases", []), - "next_page_token": data.get("nextPageToken", ""), - } - except ValueError as e: - raise APIError(f"Failed to parse cases response: {str(e)}") from e + return chronicle_request( + client, + method="GET", + endpoint_path="legacy:legacyListCases", + api_version=APIVersion.V1ALPHA, + params=params, + error_message="Failed to retrieve cases", + ) def get_cases_from_list(client, case_ids: list[str]) -> CaseList: @@ -108,27 +108,429 @@ def get_cases_from_list(client, case_ids: list[str]) -> CaseList: APIError: If the API request fails ValueError: If too many case IDs are provided """ - # Check that we don't exceed the maximum number of cases if len(case_ids) > 1000: raise ValueError("Maximum of 1000 cases can be retrieved in a batch") - url = f"{client.base_url}/{client.instance_id}/legacy:legacyBatchGetCases" + data = chronicle_request( + client, + method="GET", + endpoint_path="legacy:legacyBatchGetCases", + api_version=APIVersion.V1ALPHA, + params={"names": case_ids}, + error_message="Failed to get cases", + ) - params = {"names": case_ids} + cases = [] + if "cases" in data: + for case_data in data["cases"]: + cases.append(Case.from_dict(case_data)) - response = client.session.get(url, params=params) + return CaseList(cases) - if response.status_code != 200: - raise APIError(f"Failed to get cases: {response.text}") - # Parse the response - cases = [] - response_data = response.json() +def execute_bulk_add_tag( + client, case_ids: list[int], tags: list[str] +) -> dict[str, Any]: + """Add tags to multiple cases in bulk. + + Args: + client: ChronicleClient instance + case_ids: List of case IDs to add tags to + tags: List of tags to add to the cases - if "cases" in response_data: - for case_data in response_data["cases"]: - # Create Case object - case = Case.from_dict(case_data) - cases.append(case) + Returns: + Empty dictionary on success - return CaseList(cases) + Raises: + APIError: If the API request fails + """ + body = {"casesIds": case_ids, "tags": tags} + + return chronicle_request( + client, + method="POST", + endpoint_path="cases:executeBulkAddTag", + api_version=APIVersion.V1BETA, + json=body, + error_message="Failed to add tags to cases", + ) + + +def execute_bulk_assign( + client, case_ids: list[int], username: str +) -> dict[str, Any]: + """Assign multiple cases to a user in bulk. + + Args: + client: ChronicleClient instance + case_ids: List of case IDs to assign + username: Username to assign the cases to + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + """ + body = {"casesIds": case_ids, "username": username} + + return chronicle_request( + client, + method="POST", + endpoint_path="cases:executeBulkAssign", + api_version=APIVersion.V1BETA, + json=body, + error_message="Failed to assign cases", + ) + + +def execute_bulk_change_priority( + client, case_ids: list[int], priority: str | CasePriority +) -> dict[str, Any]: + """Change priority of multiple cases in bulk. + + Args: + client: ChronicleClient instance + case_ids: List of case IDs to change priority for + priority: Priority level. + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + """ + if isinstance(priority, str): + original_priority_value = priority + try: + priority = CasePriority[original_priority_value] + except KeyError: + try: + priority = CasePriority(original_priority_value) + except ValueError as ve: + valid_values = ", ".join([p.name for p in CasePriority]) + raise ValueError( + f"Invalid priority '{priority}'. " + f"Valid values: {valid_values}" + ) from ve + + body = {"casesIds": case_ids, "priority": priority} + + return chronicle_request( + client, + method="POST", + endpoint_path="cases:executeBulkChangePriority", + api_version=APIVersion.V1BETA, + json=body, + error_message="Failed to change case priority", + ) + + +def execute_bulk_change_stage( + client, case_ids: list[int], stage: str +) -> dict[str, Any]: + """Change stage of multiple cases in bulk. + + Args: + client: ChronicleClient instance + case_ids: List of case IDs to change stage for + stage: Stage to set for the cases + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + """ + body = {"casesIds": case_ids, "stage": stage} + + return chronicle_request( + client, + method="POST", + endpoint_path="cases:executeBulkChangeStage", + api_version=APIVersion.V1BETA, + json=body, + error_message="Failed to change case stage", + ) + + +def execute_bulk_close( + client, + case_ids: list[int], + close_reason: str | CaseCloseReason, + root_cause: str | None = None, + close_comment: str | None = None, + dynamic_parameters: list[dict[str, Any]] | None = None, +) -> dict[str, Any]: + """Close multiple cases in bulk. + + Args: + client: ChronicleClient instance + case_ids: List of case IDs to close + close_reason: Reason for closing the cases. + Can be CaseCloseReason enum or string. + Valid values: MALICIOUS, NOT_MALICIOUS, MAINTENANCE, + INCONCLUSIVE, UNKNOWN, CLOSE_REASON_UNSPECIFIED + root_cause: Optional root cause for closing cases + close_comment: Optional comment to add when closing + dynamic_parameters: Optional dynamic parameters for close action + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + ValueError: If an invalid close_reason value is provided + """ + if isinstance(close_reason, str): + original_close_reason = close_reason + try: + close_reason = CaseCloseReason[original_close_reason] + except KeyError: + try: + close_reason = CaseCloseReason(original_close_reason) + except ValueError as ve: + valid_values = ", ".join([r.name for r in CaseCloseReason]) + raise ValueError( + f"Invalid close_reason '{close_reason}'. " + f"Valid values: {valid_values}" + ) from ve + + body: dict[str, Any] = { + "casesIds": case_ids, + "closeReason": close_reason, + } + + if root_cause is not None: + body["rootCause"] = root_cause + if close_comment is not None: + body["closeComment"] = close_comment + if dynamic_parameters is not None: + body["dynamicParameters"] = dynamic_parameters + + return chronicle_request( + client, + method="POST", + endpoint_path="cases:executeBulkClose", + api_version=APIVersion.V1BETA, + json=body, + error_message="Failed to close cases", + ) + + +def execute_bulk_reopen( + client, case_ids: list[int], reopen_comment: str +) -> dict[str, Any]: + """Reopen multiple cases in bulk. + + Args: + client: ChronicleClient instance + case_ids: List of case IDs to reopen + reopen_comment: Comment to add when reopening cases + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + """ + body = {"casesIds": case_ids, "reopenComment": reopen_comment} + + return chronicle_request( + client, + method="POST", + endpoint_path="cases:executeBulkReopen", + api_version=APIVersion.V1BETA, + json=body, + error_message="Failed to reopen cases", + ) + + +def get_case(client, case_name: str, expand: str | None = None) -> Case: + """Get a single case details. + + Args: + client: ChronicleClient instance + case_name: Case resource name or case ID. + Full format: projects/{project}/locations/{location}/ + instances/{instance}/cases/{case} + Short format: {case_id} (e.g., "12345") + expand: Optional expand field for getting related resources + + Returns: + Case object with case details + + Raises: + APIError: If the API request fails + """ + if not case_name.startswith("projects/"): + endpoint_path = f"cases/{case_name}" + else: + endpoint_path = case_name + + params: dict[str, Any] = {} + if expand: + params["expand"] = expand + + data = chronicle_request( + client, + method="GET", + endpoint_path=endpoint_path, + api_version=APIVersion.V1BETA, + params=params, + error_message="Failed to get case", + ) + + return Case.from_dict(data) + + +def list_cases( + client, + page_size: int | None = None, + page_token: str | None = None, + filter_query: str | None = None, + order_by: str | None = None, + expand: str | None = None, + distinct_by: str | None = None, + as_list: bool = False, +) -> list[dict[str, Any]] | dict[str, Any]: + """List cases with optional filtering and pagination. + + Args: + client: ChronicleClient instance + page_size: Maximum number of cases to return per page (1-1000). + If None, automatically paginates through all results. + page_token: Token for pagination from previous list call. + filter_query: Filter expression for filtering cases + order_by: Comma-separated list of fields to order by + expand: Expand fields (e.g., "tags, products") + distinct_by: Field to distinct cases by + as_list: If True, return a list of cases instead of a dict + with cases list, nextPageToken, and totalSize. + + Returns: + If as_list is True: A list of case dictionaries. + If as_list is False: A dictionary containing: + - cases: List of case dictionaries + - nextPageToken: Token for next page (empty if auto-paginated) + - totalSize: Total number of matching cases + + Raises: + APIError: If the API request fails + """ + extra_params: dict[str, Any] = {} + if filter_query: + extra_params["filter"] = filter_query + if order_by: + extra_params["orderBy"] = order_by + if expand: + extra_params["expand"] = expand + if distinct_by: + extra_params["distinctBy"] = distinct_by + + return chronicle_paginated_request( + client, + api_version=APIVersion.V1BETA, + path="cases", + items_key="cases", + page_size=page_size, + page_token=page_token, + extra_params=extra_params if extra_params else None, + as_list=as_list, + ) + + +def merge_cases( + client, case_ids: list[int], case_to_merge_with: int +) -> dict[str, Any]: + """Merge multiple cases into a single case. + + Args: + client: ChronicleClient instance + case_ids: List of case IDs to merge (source cases) + case_to_merge_with: ID of the target case to merge into + + Returns: + Dictionary containing: + - newCaseId: ID of the merged case if successful + - isRequestValid: Whether the request was valid + - errors: List of errors if request was invalid + + Raises: + APIError: If the API request fails + + Note: + The API requires all cases (including target) in casesIds. + The target case is specified separately in caseToMergeWith. + """ + all_case_ids = list(set(case_ids + [case_to_merge_with])) + body = {"casesIds": all_case_ids, "caseToMergeWith": case_to_merge_with} + + return chronicle_request( + client, + method="POST", + endpoint_path="cases:merge", + api_version=APIVersion.V1BETA, + json=body, + error_message="Failed to merge cases", + ) + + +def patch_case( + client, + case_name: str, + case_data: dict[str, Any], + update_mask: str | None = None, +) -> Case: + """Update a case using partial update (PATCH). + + Args: + client: ChronicleClient instance + case_name: Case resource name or case ID. + Full format: projects/{project}/locations/{location}/ + instances/{instance}/cases/{case} + Short format: {case_id} (e.g., "12345") + case_data: Dictionary containing case fields to update. + update_mask: Optional comma-separated list of fields to update + + Returns: + Updated Case object + + Raises: + APIError: If the API request fails + ValueError: If an invalid priority value is provided + """ + if not case_name.startswith("projects/"): + endpoint_path = f"cases/{case_name}" + else: + endpoint_path = case_name + + if "priority" in case_data and isinstance(case_data["priority"], str): + case_priority = case_data["priority"] + try: + case_data["priority"] = CasePriority[case_priority] + except KeyError: + try: + case_data["priority"] = CasePriority(case_priority) + except ValueError as ve: + valid_values = ", ".join([p.name for p in CasePriority]) + raise ValueError( + f"Invalid priority '{case_data['priority']}'. " + f"Valid values: {valid_values}" + ) from ve + + params: dict[str, Any] = {} + if update_mask: + params["updateMask"] = update_mask + + data = chronicle_request( + client, + method="PATCH", + endpoint_path=endpoint_path, + api_version=APIVersion.V1BETA, + json=case_data, + params=params if params else None, + error_message="Failed to patch case", + ) + + return Case.from_dict(data) diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index b32b6ae..1c0047e 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -25,7 +25,21 @@ from secops import auth as secops_auth from secops.auth import RetryConfig from secops.chronicle.alert import get_alerts as _get_alerts +from secops.chronicle.case import execute_bulk_add_tag as _execute_bulk_add_tag +from secops.chronicle.case import execute_bulk_assign as _execute_bulk_assign +from secops.chronicle.case import ( + execute_bulk_change_priority as _execute_bulk_change_priority, +) +from secops.chronicle.case import ( + execute_bulk_change_stage as _execute_bulk_change_stage, +) +from secops.chronicle.case import execute_bulk_close as _execute_bulk_close +from secops.chronicle.case import execute_bulk_reopen as _execute_bulk_reopen +from secops.chronicle.case import get_case as _get_case from secops.chronicle.case import get_cases_from_list +from secops.chronicle.case import list_cases as _list_cases +from secops.chronicle.case import merge_cases as _merge_cases +from secops.chronicle.case import patch_case as _patch_case from secops.chronicle.dashboard import DashboardAccessType, DashboardView from secops.chronicle.dashboard import add_chart as _add_chart from secops.chronicle.dashboard import create_dashboard as _create_dashboard @@ -159,7 +173,9 @@ ) from secops.chronicle.models import ( APIVersion, + CaseCloseReason, CaseList, + CasePriority, DashboardChart, DashboardQuery, EntitySummary, @@ -1031,6 +1047,227 @@ def get_cases(self, case_ids: list[str]) -> CaseList: """ return get_cases_from_list(self, case_ids) + def get_case(self, case_name: str, expand: str | None = None) -> "Case": + """Get a single case details. + + Args: + case_name: Case resource name or case ID. + expand: Optional expand field for getting related resources + + Returns: + Case object with case details + + Raises: + APIError: If the API request fails + """ + return _get_case(self, case_name, expand) + + def list_cases( + self, + page_size: int | None = None, + page_token: str | None = None, + filter_query: str | None = None, + order_by: str | None = None, + expand: str | None = None, + distinct_by: str | None = None, + as_list: bool = False, + ) -> list[dict[str, Any]] | dict[str, Any]: + """List cases with optional filtering and pagination. + + Args: + page_size: Maximum number of cases to return per page (1-1000). + If None, automatically paginates and returns all results. + page_token: Token for pagination from previous list call. + Only used when page_size is provided. + filter_query: Filter expression for filtering cases + order_by: Comma-separated list of fields to order by + expand: Expand fields (e.g., "tags, products") + distinct_by: Field to distinct cases by + as_list: If True, return a list of cases instead of a dict + with cases list, nextPageToken, and totalSize. + + Returns: + If as_list is True: A list of case dictionaries. + If as_list is False: A dictionary with cases, nextPageToken, + and totalSize. + + Raises: + APIError: If the API request fails + ValueError: If page_size is invalid + """ + return _list_cases( + self, + page_size, + page_token, + filter_query, + order_by, + expand, + distinct_by, + as_list, + ) + + def patch_case( + self, + case_name: str, + case_data: dict[str, Any], + update_mask: str | None = None, + ) -> "Case": + """Update a case using partial update (PATCH). + + Args: + case_name: Case resource name or case ID. + case_data: Dictionary containing case fields to update + update_mask: Optional comma-separated list of fields to update + + Returns: + Updated Case object + + Raises: + APIError: If the API request fails + """ + return _patch_case(self, case_name, case_data, update_mask) + + def merge_cases( + self, case_ids: list[int], case_to_merge_with: int + ) -> dict[str, Any]: + """Merge multiple cases into a single case. + + Args: + case_ids: List of case IDs to merge + case_to_merge_with: ID of the case to merge with + + Returns: + Dictionary with newCaseId, isRequestValid, and errors + + Raises: + APIError: If the API request fails + """ + return _merge_cases(self, case_ids, case_to_merge_with) + + def execute_bulk_add_tag( + self, case_ids: list[int], tags: list[str] + ) -> dict[str, Any]: + """Add tags to multiple cases in bulk. + + Args: + case_ids: List of case IDs to add tags to + tags: List of tags to add to the cases + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + """ + return _execute_bulk_add_tag(self, case_ids, tags) + + def execute_bulk_assign( + self, case_ids: list[int], username: str + ) -> dict[str, Any]: + """Assign multiple cases to a user in bulk. + + Args: + case_ids: List of case IDs to assign + username: Username to assign the cases to + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + """ + return _execute_bulk_assign(self, case_ids, username) + + def execute_bulk_change_priority( + self, case_ids: list[int], priority: str | CasePriority + ) -> dict[str, Any]: + """Change priority of multiple cases in bulk. + + Args: + case_ids: List of case IDs to change priority for + priority: Priority level (CasePriority enum or string). + Valid values: PRIORITY_UNSPECIFIED, PRIORITY_INFO, + PRIORITY_LOW, PRIORITY_MEDIUM, PRIORITY_HIGH, + PRIORITY_CRITICAL + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + """ + return _execute_bulk_change_priority(self, case_ids, priority) + + def execute_bulk_change_stage( + self, case_ids: list[int], stage: str + ) -> dict[str, Any]: + """Change stage of multiple cases in bulk. + + Args: + case_ids: List of case IDs to change stage for + stage: Stage to set for the cases + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + """ + return _execute_bulk_change_stage(self, case_ids, stage) + + def execute_bulk_close( + self, + case_ids: list[int], + close_reason: str | CaseCloseReason, + root_cause: str | None = None, + close_comment: str | None = None, + dynamic_parameters: list[dict[str, Any]] | None = None, + ) -> dict[str, Any]: + """Close multiple cases in bulk. + + Args: + case_ids: List of case IDs to close + close_reason: Reason for closing the cases. + Can be CaseCloseReason enum or string. + Valid values: MALICIOUS, NOT_MALICIOUS, MAINTENANCE, + INCONCLUSIVE, UNKNOWN, CLOSE_REASON_UNSPECIFIED + root_cause: Optional root cause for closing cases + close_comment: Optional comment to add when closing + dynamic_parameters: Optional dynamic parameters for close + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + ValueError: If an invalid close_reason value is provided + """ + return _execute_bulk_close( + self, + case_ids, + close_reason, + root_cause, + close_comment, + dynamic_parameters, + ) + + def execute_bulk_reopen( + self, case_ids: list[int], reopen_comment: str + ) -> dict[str, Any]: + """Reopen multiple cases in bulk. + + Args: + case_ids: List of case IDs to reopen + reopen_comment: Comment to add when reopening cases + + Returns: + Empty dictionary on success + + Raises: + APIError: If the API request fails + """ + return _execute_bulk_reopen(self, case_ids, reopen_comment) + def get_alerts( self, start_time: datetime, @@ -3700,7 +3937,7 @@ def update_data_table_rows( (format: projects/{project}/locations/{location}/ instances/{instance}/dataTables/{table}/ dataTableRows/{row_id}) - - 'values': List[str] - The new values for the row + - 'values': list[str] - The new values for the row - 'update_mask': str (optional) - Comma-separated list of fields to update (e.g., 'values'). If not specified, all fields are updated. diff --git a/src/secops/chronicle/models.py b/src/secops/chronicle/models.py index 0074bc5..5db56d2 100644 --- a/src/secops/chronicle/models.py +++ b/src/secops/chronicle/models.py @@ -73,6 +73,28 @@ class DetectionType(StrEnum): CASE = "DETECTION_TYPE_CASE" +class CasePriority(StrEnum): + """Priority levels for cases.""" + + UNSPECIFIED = "PRIORITY_UNSPECIFIED" + INFO = "PRIORITY_INFO" + LOW = "PRIORITY_LOW" + MEDIUM = "PRIORITY_MEDIUM" + HIGH = "PRIORITY_HIGH" + CRITICAL = "PRIORITY_CRITICAL" + + +class CaseCloseReason(StrEnum): + """Close reason values for cases.""" + + UNSPECIFIED = "CLOSE_REASON_UNSPECIFIED" + MALICIOUS = "MALICIOUS" + NOT_MALICIOUS = "NOT_MALICIOUS" + MAINTENANCE = "MAINTENANCE" + INCONCLUSIVE = "INCONCLUSIVE" + UNKNOWN = "UNKNOWN" + + @dataclass class TimeInterval: """Time interval with start and end times.""" diff --git a/src/secops/cli/commands/case.py b/src/secops/cli/commands/case.py index 49c430e..c3d31b6 100644 --- a/src/secops/cli/commands/case.py +++ b/src/secops/cli/commands/case.py @@ -14,51 +14,456 @@ # """Google SecOps CLI case commands""" +import json import sys +from secops.cli.utils.common_args import add_pagination_args from secops.cli.utils.formatters import output_formatter +from secops.exceptions import APIError def setup_case_command(subparsers): """Set up the case command parser.""" case_parser = subparsers.add_parser("case", help="Manage cases") + + # Legacy argument for backward compatibility case_parser.add_argument("--ids", help="Comma-separated list of case IDs") case_parser.set_defaults(func=handle_case_command) + case_subparsers = case_parser.add_subparsers( + dest="case_command", help="Case command" + ) + + # Get single case command + get_parser = case_subparsers.add_parser( + "get", help="Get a single case by ID" + ) + get_parser.add_argument( + "--id", required=True, help="Case ID or resource name" + ) + get_parser.add_argument( + "--expand", help="Expand fields (e.g., 'tags,products')" + ) + get_parser.set_defaults(func=handle_case_get_command) + + # List cases command + list_parser = case_subparsers.add_parser( + "list", help="List cases with filtering" + ) + add_pagination_args(list_parser) + list_parser.add_argument( + "--filter", help="Filter expression for filtering cases" + ) + list_parser.add_argument( + "--order-by", + "--order_by", + dest="order_by", + help="Comma-separated list of fields to order by", + ) + list_parser.add_argument( + "--expand", help="Expand fields (e.g., 'tags,products')" + ) + list_parser.add_argument( + "--distinct-by", + "--distinct_by", + dest="distinct_by", + help="Field to distinct cases by", + ) + list_parser.add_argument( + "--as-list", + "--as_list", + dest="as_list", + action="store_true", + help="Return list of cases instead of dict with metadata", + ) + list_parser.set_defaults(func=handle_case_list_command) + + # Update case command + update_parser = case_subparsers.add_parser("update", help="Update a case") + update_parser.add_argument( + "--id", required=True, help="Case ID or resource name" + ) + update_parser.add_argument( + "--data", + required=True, + help="JSON string with case fields to update", + ) + update_parser.add_argument( + "--update-mask", + "--update_mask", + dest="update_mask", + help="Comma-separated list of fields to update", + ) + update_parser.set_defaults(func=handle_case_patch_command) + + # Merge cases command + merge_parser = case_subparsers.add_parser( + "merge", help="Merge multiple cases into one" + ) + merge_parser.add_argument( + "--source-ids", + "--source_ids", + dest="source_ids", + required=True, + help="Comma-separated list of case IDs to merge (source cases)", + ) + merge_parser.add_argument( + "--target-id", + "--target_id", + dest="target_id", + required=True, + type=int, + help="Case ID to merge into (target case)", + ) + merge_parser.set_defaults(func=handle_case_merge_command) + + # Bulk add tag command + bulk_add_tag_parser = case_subparsers.add_parser( + "bulk-add-tag", help="Add tags to multiple cases" + ) + bulk_add_tag_parser.add_argument( + "--ids", + required=True, + help="Comma-separated list of case IDs (integers)", + ) + bulk_add_tag_parser.add_argument( + "--tags", + required=True, + help="Comma-separated list of tags to add", + ) + bulk_add_tag_parser.set_defaults(func=handle_case_bulk_add_tag_command) + + # Bulk assign command + bulk_assign_parser = case_subparsers.add_parser( + "bulk-assign", help="Assign multiple cases to a user" + ) + bulk_assign_parser.add_argument( + "--ids", + required=True, + help="Comma-separated list of case IDs (integers)", + ) + bulk_assign_parser.add_argument( + "--username", required=True, help="Username to assign cases to" + ) + bulk_assign_parser.set_defaults(func=handle_case_bulk_assign_command) + + # Bulk change priority command + bulk_priority_parser = case_subparsers.add_parser( + "bulk-change-priority", help="Change priority of multiple cases" + ) + bulk_priority_parser.add_argument( + "--ids", + required=True, + help="Comma-separated list of case IDs (integers)", + ) + bulk_priority_parser.add_argument( + "--priority", + required=True, + choices=[ + "UNSPECIFIED", + "INFO", + "LOW", + "MEDIUM", + "HIGH", + "CRITICAL", + "PRIORITY_UNSPECIFIED", + "PRIORITY_INFO", + "PRIORITY_LOW", + "PRIORITY_MEDIUM", + "PRIORITY_HIGH", + "PRIORITY_CRITICAL", + ], + help="Priority level to set", + ) + bulk_priority_parser.set_defaults( + func=handle_case_bulk_change_priority_command + ) + + # Bulk change stage command + bulk_stage_parser = case_subparsers.add_parser( + "bulk-change-stage", help="Change stage of multiple cases" + ) + bulk_stage_parser.add_argument( + "--ids", + required=True, + help="Comma-separated list of case IDs (integers)", + ) + bulk_stage_parser.add_argument( + "--stage", required=True, help="Stage to set for the cases" + ) + bulk_stage_parser.set_defaults(func=handle_case_bulk_change_stage_command) + + # Bulk close command + bulk_close_parser = case_subparsers.add_parser( + "bulk-close", help="Close multiple cases" + ) + bulk_close_parser.add_argument( + "--ids", + required=True, + help="Comma-separated list of case IDs (integers)", + ) + bulk_close_parser.add_argument( + "--close-reason", + "--close_reason", + dest="close_reason", + required=True, + choices=[ + "UNSPECIFIED", + "MALICIOUS", + "NOT_MALICIOUS", + "MAINTENANCE", + "INCONCLUSIVE", + "UNKNOWN", + "CLOSE_REASON_UNSPECIFIED", + ], + help="Reason for closing the cases", + ) + bulk_close_parser.add_argument( + "--root-cause", + "--root_cause", + dest="root_cause", + help="Root cause for closing cases", + ) + bulk_close_parser.add_argument( + "--close-comment", + "--close_comment", + dest="close_comment", + help="Comment to add when closing", + ) + bulk_close_parser.set_defaults(func=handle_case_bulk_close_command) + + # Bulk reopen command + bulk_reopen_parser = case_subparsers.add_parser( + "bulk-reopen", help="Reopen multiple cases" + ) + bulk_reopen_parser.add_argument( + "--ids", + required=True, + help="Comma-separated list of case IDs (integers)", + ) + bulk_reopen_parser.add_argument( + "--reopen-comment", + "--reopen_comment", + dest="reopen_comment", + required=True, + help="Comment to add when reopening cases", + ) + bulk_reopen_parser.set_defaults(func=handle_case_bulk_reopen_command) + def handle_case_command(args, chronicle): - """Handle case command.""" + """Handle case command (legacy behavior with --ids).""" + # If --ids is provided without subcommand, use legacy behavior + if args.ids and not args.case_command: + handle_case_get_batch_command(args, chronicle) + elif not args.case_command: + # No subcommand and no --ids, show help + print("Error: No subcommand or --ids provided", file=sys.stderr) + sys.exit(1) + + +def handle_case_get_batch_command(args, chronicle): + """Handle case get-batch command.""" try: - if args.ids: - case_ids = [id.strip() for id in args.ids.split(",")] - result = chronicle.get_cases(case_ids) - - # Convert CaseList to dictionary for output - cases_dict = { - "cases": [ - { - "id": case.id, - "display_name": case.display_name, - "stage": case.stage, - "priority": case.priority, - "status": case.status, - "soar_platform_info": ( - { - "case_id": case.soar_platform_info.case_id, - "platform_type": case.soar_platform_info.platform_type, # pylint: disable=line-too-long - } - if case.soar_platform_info - else None - ), - "alert_ids": case.alert_ids, - } - for case in result.cases - ] - } - output_formatter(cases_dict, args.output) - else: - print("Error: No case IDs provided", file=sys.stderr) - sys.exit(1) + case_ids = [case_id.strip() for case_id in args.ids.split(",")] + result = chronicle.get_cases(case_ids) + + # Convert CaseList to dictionary for output + cases_dict = { + "cases": [ + { + "id": case.id, + "display_name": case.display_name, + "stage": case.stage, + "priority": case.priority, + "status": case.status, + "soar_platform_info": ( + { + "case_id": case.soar_platform_info.case_id, + "platform_type": ( + case.soar_platform_info.platform_type + ), + } + if case.soar_platform_info + else None + ), + "alert_ids": case.alert_ids, + } + for case in result.cases + ] + } + output_formatter(cases_dict, args.output) except Exception as e: # pylint: disable=broad-exception-caught print(f"Error: {e}", file=sys.stderr) sys.exit(1) + + +def handle_case_get_command(args, chronicle): + """Handle case get command.""" + try: + case = chronicle.get_case(args.id, expand=args.expand) + case_dict = { + "id": case.id, + "display_name": case.display_name, + "stage": case.stage, + "priority": case.priority, + "status": case.status, + } + if case.soar_platform_info: + case_dict["soar_platform_info"] = { + "case_id": case.soar_platform_info.case_id, + "platform_type": case.soar_platform_info.platform_type, + } + if case.alert_ids: + case_dict["alert_ids"] = case.alert_ids + output_formatter(case_dict, args.output) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_case_list_command(args, chronicle): + """Handle case list command.""" + try: + result = chronicle.list_cases( + page_size=args.page_size, + page_token=args.page_token, + filter_query=args.filter, + order_by=args.order_by, + expand=args.expand, + distinct_by=args.distinct_by, + as_list=args.as_list, + ) + output_formatter(result, args.output) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_case_patch_command(args, chronicle): + """Handle case patch command.""" + try: + case_data = json.loads(args.data) + case = chronicle.patch_case( + args.id, case_data, update_mask=args.update_mask + ) + case_dict = { + "id": case.id, + "display_name": case.display_name, + "stage": case.stage, + "priority": case.priority, + "status": case.status, + } + output_formatter(case_dict, args.output) + except json.JSONDecodeError as e: + print(f"Error: Invalid JSON data - {e}", file=sys.stderr) + sys.exit(1) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_case_merge_command(args, chronicle): + """Handle case merge command.""" + try: + source_ids = [ + int(case_id.strip()) for case_id in args.source_ids.split(",") + ] + result = chronicle.merge_cases(source_ids, args.target_id) + output_formatter(result, args.output) + except ValueError as e: + print(f"Error: Invalid case ID format - {e}", file=sys.stderr) + sys.exit(1) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_case_bulk_add_tag_command(args, chronicle): + """Handle case bulk-add-tag command.""" + try: + case_ids = [int(case_id.strip()) for case_id in args.ids.split(",")] + tags = [tag.strip() for tag in args.tags.split(",")] + result = chronicle.execute_bulk_add_tag(case_ids, tags) + output_formatter(result, args.output) + except ValueError as e: + print(f"Error: Invalid case ID format - {e}", file=sys.stderr) + sys.exit(1) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_case_bulk_assign_command(args, chronicle): + """Handle case bulk-assign command.""" + try: + case_ids = [int(case_id.strip()) for case_id in args.ids.split(",")] + result = chronicle.execute_bulk_assign(case_ids, args.username) + output_formatter(result, args.output) + except ValueError as e: + print(f"Error: Invalid case ID format - {e}", file=sys.stderr) + sys.exit(1) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_case_bulk_change_priority_command(args, chronicle): + """Handle case bulk-change-priority command.""" + try: + case_ids = [int(case_id.strip()) for case_id in args.ids.split(",")] + result = chronicle.execute_bulk_change_priority(case_ids, args.priority) + output_formatter(result, args.output) + except ValueError as e: + print(f"Error: Invalid case ID format - {e}", file=sys.stderr) + sys.exit(1) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_case_bulk_change_stage_command(args, chronicle): + """Handle case bulk-change-stage command.""" + try: + case_ids = [int(case_id.strip()) for case_id in args.ids.split(",")] + result = chronicle.execute_bulk_change_stage(case_ids, args.stage) + output_formatter(result, args.output) + except ValueError as e: + print(f"Error: Invalid case ID format - {e}", file=sys.stderr) + sys.exit(1) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_case_bulk_close_command(args, chronicle): + """Handle case bulk-close command.""" + try: + case_ids = [int(case_id.strip()) for case_id in args.ids.split(",")] + result = chronicle.execute_bulk_close( + case_ids, + args.close_reason, + root_cause=args.root_cause, + close_comment=args.close_comment, + ) + output_formatter(result, args.output) + except ValueError as e: + print(f"Error: Invalid case ID format - {e}", file=sys.stderr) + sys.exit(1) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_case_bulk_reopen_command(args, chronicle): + """Handle case bulk-reopen command.""" + try: + case_ids = [int(case_id.strip()) for case_id in args.ids.split(",")] + result = chronicle.execute_bulk_reopen(case_ids, args.reopen_comment) + output_formatter(result, args.output) + except ValueError as e: + print(f"Error: Invalid case ID format - {e}", file=sys.stderr) + sys.exit(1) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) diff --git a/tests/chronicle/test_case.py b/tests/chronicle/test_case.py new file mode 100644 index 0000000..110bc65 --- /dev/null +++ b/tests/chronicle/test_case.py @@ -0,0 +1,810 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests for Chronicle case management functions.""" + +import pytest +from unittest.mock import Mock, patch +from secops.chronicle.client import ChronicleClient +from secops.chronicle.case import ( + CasePriority, + execute_bulk_add_tag, + execute_bulk_assign, + execute_bulk_change_priority, + execute_bulk_change_stage, + execute_bulk_close, + execute_bulk_reopen, + get_case, + list_cases, + merge_cases, + patch_case, +) +from secops.chronicle import case as case_module +from secops.chronicle.models import Case +from secops.exceptions import APIError + + +@pytest.fixture +def chronicle_client(): + """Create a Chronicle client for testing.""" + with patch("secops.auth.SecOpsAuth") as mock_auth: + mock_session = Mock() + mock_session.headers = {} + mock_auth.return_value.session = mock_session + return ChronicleClient( + customer_id="test-customer", + project_id="test-project", + region="us", + ) + + +@pytest.fixture +def mock_case_data(): + """Create mock case data.""" + return { + "id": "12345", + "displayName": "Test Case", + "stage": "Investigation", + "priority": "PRIORITY_HIGH", + "status": "OPEN", + } + + +# Tests for execute_bulk_add_tag + + +def test_execute_bulk_add_tag_success(chronicle_client): + """Test successful bulk add tag operation.""" + mock_return = {} + + with patch.object( + case_module, "chronicle_request", return_value=mock_return + ) as mock_request: + result = execute_bulk_add_tag( + chronicle_client, [123, 456], ["tag1", "tag2"] + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "POST" + assert call_args[1]["endpoint_path"] == "cases:executeBulkAddTag" + assert call_args[1]["json"] == { + "casesIds": [123, 456], + "tags": ["tag1", "tag2"], + } + assert result == {} + + +def test_execute_bulk_add_tag_api_error(chronicle_client): + """Test bulk add tag with API error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Failed to add tags to cases"), + ): + with pytest.raises(APIError, match="Failed to add tags to cases"): + execute_bulk_add_tag(chronicle_client, [123], ["tag1"]) + + +def test_execute_bulk_add_tag_empty_tags(chronicle_client): + """Test bulk add tag with empty tags list.""" + with patch.object(case_module, "chronicle_request", return_value={}): + result = execute_bulk_add_tag(chronicle_client, [123], []) + assert result == {} + + +def test_execute_bulk_add_tag_json_parse_error(chronicle_client): + """Test bulk add tag with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + execute_bulk_add_tag(chronicle_client, [123], ["tag1"]) + + +# Tests for execute_bulk_assign + + +def test_execute_bulk_assign_success(chronicle_client): + """Test successful bulk assign operation.""" + with patch.object( + case_module, "chronicle_request", return_value={} + ) as mock_request: + result = execute_bulk_assign( + chronicle_client, [123, 456], "user@example.com" + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "POST" + assert call_args[1]["endpoint_path"] == "cases:executeBulkAssign" + assert call_args[1]["json"] == { + "casesIds": [123, 456], + "username": "user@example.com", + } + assert result == {} + + +def test_execute_bulk_assign_api_error(chronicle_client): + """Test bulk assign with API error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Failed to assign cases"), + ): + with pytest.raises(APIError, match="Failed to assign cases"): + execute_bulk_assign(chronicle_client, [123], "user@example.com") + + +def test_execute_bulk_assign_json_parse_error(chronicle_client): + """Test bulk assign with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + execute_bulk_assign(chronicle_client, [123], "user@example.com") + + +# Tests for execute_bulk_change_priority + + +def test_execute_bulk_change_priority_with_enum(chronicle_client): + """Test bulk change priority using CasePriority enum.""" + with patch.object( + case_module, "chronicle_request", return_value={} + ) as mock_request: + result = execute_bulk_change_priority( + chronicle_client, [123, 456], CasePriority.HIGH + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "POST" + assert ( + call_args[1]["endpoint_path"] == "cases:executeBulkChangePriority" + ) + assert str(call_args[1]["json"]["priority"]) == "PRIORITY_HIGH" + assert result == {} + + +def test_execute_bulk_change_priority_with_string(chronicle_client): + """Test bulk change priority using string value.""" + with patch.object( + case_module, "chronicle_request", return_value={} + ) as mock_request: + result = execute_bulk_change_priority( + chronicle_client, [123], "PRIORITY_MEDIUM" + ) + + call_args = mock_request.call_args + assert str(call_args[1]["json"]["priority"]) == "PRIORITY_MEDIUM" + assert result == {} + + +def test_execute_bulk_change_priority_api_error(chronicle_client): + """Test bulk change priority with API error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Failed to change case priority"), + ): + with pytest.raises(APIError, match="Failed to change case priority"): + execute_bulk_change_priority( + chronicle_client, [123], CasePriority.HIGH + ) + + +def test_execute_bulk_change_priority_json_parse_error( + chronicle_client, +): + """Test bulk change priority with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + execute_bulk_change_priority( + chronicle_client, [123], CasePriority.LOW + ) + + +# Tests for execute_bulk_change_stage + + +def test_execute_bulk_change_stage_success(chronicle_client): + """Test successful bulk change stage operation.""" + with patch.object( + case_module, "chronicle_request", return_value={} + ) as mock_request: + result = execute_bulk_change_stage( + chronicle_client, [123, 456], "Investigation" + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "POST" + assert call_args[1]["endpoint_path"] == "cases:executeBulkChangeStage" + assert call_args[1]["json"] == { + "casesIds": [123, 456], + "stage": "Investigation", + } + assert result == {} + + +def test_execute_bulk_change_stage_api_error(chronicle_client): + """Test bulk change stage with API error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Failed to change case stage"), + ): + with pytest.raises(APIError, match="Failed to change case stage"): + execute_bulk_change_stage(chronicle_client, [123], "InvalidStage") + + +def test_execute_bulk_change_stage_json_parse_error(chronicle_client): + """Test bulk change stage with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + execute_bulk_change_stage(chronicle_client, [123], "Investigation") + + +# Tests for execute_bulk_close + + +def test_execute_bulk_close_success(chronicle_client): + """Test successful bulk close operation.""" + with patch.object( + case_module, "chronicle_request", return_value={} + ) as mock_request: + result = execute_bulk_close( + chronicle_client, + [123, 456], + "NOT_MALICIOUS", + root_cause="No threat", + close_comment="Resolved", + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "POST" + assert call_args[1]["endpoint_path"] == "cases:executeBulkClose" + json_body = call_args[1]["json"] + assert json_body["casesIds"] == [123, 456] + assert str(json_body["closeReason"]) == "NOT_MALICIOUS" + assert json_body["rootCause"] == "No threat" + assert json_body["closeComment"] == "Resolved" + assert result == {} + + +def test_execute_bulk_close_minimal_params(chronicle_client): + """Test bulk close with minimal parameters.""" + with patch.object( + case_module, "chronicle_request", return_value={} + ) as mock_request: + result = execute_bulk_close(chronicle_client, [123], "MALICIOUS") + + call_args = mock_request.call_args + json_body = call_args[1]["json"] + assert json_body["casesIds"] == [123] + assert str(json_body["closeReason"]) == "MALICIOUS" + assert result == {} + + +def test_execute_bulk_close_empty_response(chronicle_client): + """Test bulk close with empty response text.""" + with patch.object(case_module, "chronicle_request", return_value={}): + result = execute_bulk_close(chronicle_client, [123], "INCONCLUSIVE") + assert result == {} + + +def test_execute_bulk_close_with_dynamic_params(chronicle_client): + """Test bulk close with dynamic parameters.""" + dynamic_params = [{"key": "value1"}, {"key": "value2"}] + + with patch.object( + case_module, "chronicle_request", return_value={} + ) as mock_request: + result = execute_bulk_close( + chronicle_client, + [123], + "MAINTENANCE", + dynamic_parameters=dynamic_params, + ) + + call_args = mock_request.call_args + assert call_args[1]["json"]["dynamicParameters"] == dynamic_params + assert result == {} + + +def test_execute_bulk_close_api_error(chronicle_client): + """Test bulk close with API error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Failed to close cases"), + ): + with pytest.raises(APIError, match="Failed to close cases"): + execute_bulk_close(chronicle_client, [123], "MALICIOUS") + + +def test_execute_bulk_close_json_parse_error(chronicle_client): + """Test bulk close with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + execute_bulk_close(chronicle_client, [123], "UNKNOWN") + + +# Tests for execute_bulk_reopen + + +def test_execute_bulk_reopen_success(chronicle_client): + """Test successful bulk reopen operation.""" + with patch.object( + case_module, "chronicle_request", return_value={} + ) as mock_request: + result = execute_bulk_reopen( + chronicle_client, [123, 456], "Reopening for review" + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "POST" + assert call_args[1]["endpoint_path"] == "cases:executeBulkReopen" + assert call_args[1]["json"] == { + "casesIds": [123, 456], + "reopenComment": "Reopening for review", + } + assert result == {} + + +def test_execute_bulk_reopen_empty_response(chronicle_client): + """Test bulk reopen with empty response text.""" + with patch.object(case_module, "chronicle_request", return_value={}): + result = execute_bulk_reopen(chronicle_client, [123], "Reopen") + assert result == {} + + +def test_execute_bulk_reopen_api_error(chronicle_client): + """Test bulk reopen with API error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Failed to reopen cases"), + ): + with pytest.raises(APIError, match="Failed to reopen cases"): + execute_bulk_reopen(chronicle_client, [123], "Reopen") + + +def test_execute_bulk_reopen_json_parse_error(chronicle_client): + """Test bulk reopen with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + execute_bulk_reopen(chronicle_client, [123], "Reopen") + + +# Tests for get_case + + +def test_get_case_with_id(chronicle_client, mock_case_data): + """Test get case using just case ID.""" + with patch.object( + case_module, "chronicle_request", return_value=mock_case_data + ) as mock_request: + result = get_case(chronicle_client, "12345") + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "GET" + assert call_args[1]["endpoint_path"] == "cases/12345" + assert not call_args[1]["params"] + + assert isinstance(result, Case) + assert result.id == "12345" + assert result.display_name == "Test Case" + assert result.priority == "PRIORITY_HIGH" + + +def test_get_case_with_full_name(chronicle_client, mock_case_data): + """Test get case using full resource name.""" + full_name = ( + "projects/test-project/locations/us/instances/" + "test-customer/cases/12345" + ) + + with patch.object( + case_module, "chronicle_request", return_value=mock_case_data + ) as mock_request: + result = get_case(chronicle_client, full_name) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["endpoint_path"] == full_name + + assert isinstance(result, Case) + assert result.id == "12345" + + +def test_get_case_with_expand(chronicle_client, mock_case_data): + """Test get case with expand parameter.""" + with patch.object( + case_module, "chronicle_request", return_value=mock_case_data + ) as mock_request: + result = get_case(chronicle_client, "12345", expand="tags,products") + + call_args = mock_request.call_args + assert call_args[1]["params"] == {"expand": "tags,products"} + assert isinstance(result, Case) + + +def test_get_case_api_error(chronicle_client): + """Test get case with API error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Failed to get case"), + ): + with pytest.raises(APIError, match="Failed to get case"): + get_case(chronicle_client, "99999") + + +def test_get_case_json_parse_error(chronicle_client): + """Test get case with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + get_case(chronicle_client, "12345") + + +# Tests for list_cases + + +def test_list_cases_with_page_size(chronicle_client, mock_case_data): + """Test list cases with page_size (single page).""" + mock_return = { + "cases": [mock_case_data], + "nextPageToken": "next-token", + "totalSize": 100, + } + + with patch.object( + case_module, + "chronicle_paginated_request", + return_value=mock_return, + ) as mock_paginated: + result = list_cases(chronicle_client, page_size=10) + + mock_paginated.assert_called_once() + call_args = mock_paginated.call_args + assert call_args[1]["page_size"] == 10 + assert call_args[1]["items_key"] == "cases" + + assert len(result["cases"]) == 1 + assert result["nextPageToken"] == "next-token" + assert result["totalSize"] == 100 + + +def test_list_cases_auto_pagination(chronicle_client, mock_case_data): + """Test list cases auto-pagination (page_size=None).""" + mock_case_data_2 = mock_case_data.copy() + mock_case_data_2["id"] = "67890" + + mock_return = { + "cases": [mock_case_data, mock_case_data_2], + "nextPageToken": "", + "totalSize": 2, + } + + with patch.object( + case_module, + "chronicle_paginated_request", + return_value=mock_return, + ) as mock_paginated: + result = list_cases(chronicle_client, page_size=None) + + mock_paginated.assert_called_once() + call_args = mock_paginated.call_args + assert call_args[1]["page_size"] is None + + assert len(result["cases"]) == 2 + assert result["nextPageToken"] == "" + assert result["totalSize"] == 2 + + +def test_list_cases_with_filters(chronicle_client, mock_case_data): + """Test list cases with filter, order, and expand.""" + mock_return = { + "cases": [mock_case_data], + "nextPageToken": "", + "totalSize": 1, + } + + with patch.object( + case_module, + "chronicle_paginated_request", + return_value=mock_return, + ) as mock_paginated: + result = list_cases( + chronicle_client, + page_size=50, + filter_query='priority="HIGH"', + order_by="createdTime desc", + expand="tags", + distinct_by="priority", + ) + + call_args = mock_paginated.call_args + extra_params = call_args[1]["extra_params"] + assert extra_params["filter"] == 'priority="HIGH"' + assert extra_params["orderBy"] == "createdTime desc" + assert extra_params["expand"] == "tags" + assert extra_params["distinctBy"] == "priority" + + +def test_list_cases_with_page_token(chronicle_client, mock_case_data): + """Test list cases with page token.""" + mock_return = { + "cases": [mock_case_data], + "nextPageToken": "", + "totalSize": 1, + } + + with patch.object( + case_module, + "chronicle_paginated_request", + return_value=mock_return, + ) as mock_paginated: + result = list_cases( + chronicle_client, page_size=10, page_token="some-token" + ) + + call_args = mock_paginated.call_args + assert call_args[1]["page_token"] == "some-token" + + +def test_list_cases_api_error(chronicle_client): + """Test list cases with API error.""" + with patch.object( + case_module, + "chronicle_paginated_request", + side_effect=APIError("Failed to list cases"), + ): + with pytest.raises(APIError): + list_cases(chronicle_client, page_size=10) + + +def test_list_cases_json_parse_error(chronicle_client): + """Test list cases with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_paginated_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + list_cases(chronicle_client, page_size=10) + + +def test_list_cases_empty_results(chronicle_client): + """Test list cases with no results.""" + mock_return = { + "cases": [], + "nextPageToken": "", + "totalSize": 0, + } + + with patch.object( + case_module, + "chronicle_paginated_request", + return_value=mock_return, + ): + result = list_cases(chronicle_client, page_size=10) + + assert len(result["cases"]) == 0 + assert result["totalSize"] == 0 + + +# Tests for merge_cases + + +def test_merge_cases_success(chronicle_client): + """Test successful merge cases operation.""" + mock_return = { + "newCaseId": 999, + "isRequestValid": True, + } + + with patch.object( + case_module, "chronicle_request", return_value=mock_return + ) as mock_request: + result = merge_cases(chronicle_client, [123, 456], 789) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "POST" + assert call_args[1]["endpoint_path"] == "cases:merge" + assert set(call_args[1]["json"]["casesIds"]) == {123, 456, 789} + assert call_args[1]["json"]["caseToMergeWith"] == 789 + assert result["newCaseId"] == 999 + assert result["isRequestValid"] is True + + +def test_merge_cases_invalid_request(chronicle_client): + """Test merge cases with invalid request.""" + mock_return = { + "isRequestValid": False, + "errors": ["Cannot merge cases from different tenants"], + } + + with patch.object( + case_module, "chronicle_request", return_value=mock_return + ): + result = merge_cases(chronicle_client, [123, 456], 789) + + assert result["isRequestValid"] is False + assert len(result["errors"]) == 1 + + +def test_merge_cases_api_error(chronicle_client): + """Test merge cases with API error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Failed to merge cases"), + ): + with pytest.raises(APIError, match="Failed to merge cases"): + merge_cases(chronicle_client, [123, 456], 789) + + +def test_merge_cases_json_parse_error(chronicle_client): + """Test merge cases with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + merge_cases(chronicle_client, [123, 456], 789) + + +# Tests for patch_case + + +def test_patch_case_with_id(chronicle_client, mock_case_data): + """Test patch case using just case ID.""" + updated_data = mock_case_data.copy() + updated_data["priority"] = "PRIORITY_CRITICAL" + + case_update = {"priority": "PRIORITY_CRITICAL"} + + with patch.object( + case_module, "chronicle_request", return_value=updated_data + ) as mock_request: + result = patch_case( + chronicle_client, + "12345", + case_update, + update_mask="priority", + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "PATCH" + assert call_args[1]["endpoint_path"] == "cases/12345" + assert str(call_args[1]["json"]["priority"]) == "PRIORITY_CRITICAL" + assert call_args[1]["params"] == {"updateMask": "priority"} + + assert isinstance(result, Case) + assert result.priority == "PRIORITY_CRITICAL" + + +def test_patch_case_with_full_name(chronicle_client, mock_case_data): + """Test patch case using full resource name.""" + full_name = ( + "projects/test-project/locations/us/instances/" + "test-customer/cases/12345" + ) + + with patch.object( + case_module, "chronicle_request", return_value=mock_case_data + ) as mock_request: + result = patch_case(chronicle_client, full_name, {"status": "CLOSED"}) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["endpoint_path"] == full_name + assert isinstance(result, Case) + + +def test_patch_case_without_update_mask(chronicle_client, mock_case_data): + """Test patch case without update mask.""" + with patch.object( + case_module, "chronicle_request", return_value=mock_case_data + ) as mock_request: + result = patch_case( + chronicle_client, "12345", {"displayName": "Updated"} + ) + + call_args = mock_request.call_args + assert call_args[1]["params"] is None + assert isinstance(result, Case) + + +def test_patch_case_multiple_fields(chronicle_client, mock_case_data): + """Test patch case with multiple fields.""" + updated_data = mock_case_data.copy() + updated_data["priority"] = "PRIORITY_LOW" + updated_data["stage"] = "Closed" + + case_update = { + "priority": "PRIORITY_LOW", + "stage": "Closed", + } + + with patch.object( + case_module, "chronicle_request", return_value=updated_data + ) as mock_request: + result = patch_case( + chronicle_client, + "12345", + case_update, + update_mask="priority,stage", + ) + + call_args = mock_request.call_args + assert str(call_args[1]["json"]["priority"]) == "PRIORITY_LOW" + assert call_args[1]["json"]["stage"] == "Closed" + assert result.priority == "PRIORITY_LOW" + assert result.stage == "Closed" + + +def test_patch_case_api_error(chronicle_client): + """Test patch case with API error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Failed to patch case"), + ): + with pytest.raises(APIError, match="Failed to patch case"): + patch_case(chronicle_client, "99999", {"status": "CLOSED"}) + + +def test_patch_case_json_parse_error(chronicle_client): + """Test patch case with JSON parsing error.""" + with patch.object( + case_module, + "chronicle_request", + side_effect=APIError("Expected JSON response"), + ): + with pytest.raises(APIError): + patch_case(chronicle_client, "12345", {"status": "CLOSED"}) diff --git a/tests/chronicle/test_case_integration.py b/tests/chronicle/test_case_integration.py new file mode 100644 index 0000000..93f4615 --- /dev/null +++ b/tests/chronicle/test_case_integration.py @@ -0,0 +1,219 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for Chronicle case management. + +These tests require valid credentials and API access. +They interact with real Chronicle API endpoints. +""" + +import pytest +from secops import SecOpsClient +from ..config import CHRONICLE_CONFIG, SERVICE_ACCOUNT_JSON +from secops.exceptions import APIError + + +@pytest.mark.integration +def test_list_and_get_cases_workflow(): + """Test listing and getting cases workflow.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Test basic list + result = chronicle.list_cases(page_size=5) + assert isinstance(result, dict) + assert "cases" in result + assert isinstance(result["cases"], list) + assert "totalSize" in result + + # Test with as_list=False (default) + result_dict = chronicle.list_cases(page_size=3, as_list=False) + assert isinstance(result_dict, dict) + assert "cases" in result_dict + assert "nextPageToken" in result_dict or "totalSize" in result_dict + + # Test with as_list=True + result_list = chronicle.list_cases(page_size=3, as_list=True) + assert isinstance(result_list, list) + if result_list: + assert "name" in result_list[0] + + # Test list with filter + filtered = chronicle.list_cases( + page_size=5, filter_query='status = "OPENED"' + ) + assert isinstance(filtered, dict) + assert "cases" in filtered + + # Test get case by ID + if result.get("cases"): + case_data = result["cases"][0] + case_id = case_data.get("name", "").split("/")[-1] + + if case_id: + case = chronicle.get_case(case_id) + assert case is not None + assert hasattr(case, "id") + assert hasattr(case, "display_name") + assert hasattr(case, "priority") + assert hasattr(case, "status") + else: + pytest.skip("No cases available for testing") + + +@pytest.mark.integration +def test_case_update_workflow(): + """Test case update (patch) workflow. + + Tests patching a case's priority and verifying the change. + """ + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Use dedicated test case ID + case_id = "7418669" + + # Get original case state + original_case = chronicle.get_case(case_id) + original_priority = original_case.priority + + # Determine new priority (toggle between HIGH and MEDIUM) + new_priority = ( + "PRIORITY_MEDIUM" + if original_priority == "PRIORITY_HIGH" + else "PRIORITY_HIGH" + ) + + try: + # Update the case + updated_case = chronicle.patch_case( + case_id, {"priority": new_priority}, update_mask="priority" + ) + + assert updated_case is not None + assert updated_case.priority == new_priority + + # Verify by fetching again + verified_case = chronicle.get_case(case_id) + assert verified_case.priority == new_priority + + finally: + # Cleanup: Restore original priority + try: + chronicle.patch_case( + case_id, + {"priority": original_priority}, + update_mask="priority", + ) + except Exception as e: + print(f"Cleanup warning: Failed to restore priority - {e}") + + +@pytest.mark.integration +def test_bulk_operations_workflow(): + """Test bulk operations workflow including tag, priority, stage.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Use dedicated test case ID + case_ids = [7418669] + + print(f"\nTesting bulk operations on cases: {case_ids}") + + # Test bulk add tag + + result = chronicle.execute_bulk_add_tag(case_ids, ["integration-test-tag"]) + assert isinstance(result, dict) + print("Bulk add tag: SUCCESS") + + # Test bulk change priority + result = chronicle.execute_bulk_change_priority(case_ids, "PRIORITY_MEDIUM") + assert isinstance(result, dict) + print("Bulk change priority: SUCCESS") + + # Test bulk change stage + result = chronicle.execute_bulk_change_stage(case_ids, "Triage") + assert isinstance(result, dict) + print("Bulk change stage: SUCCESS") + + +@pytest.mark.integration +def test_bulk_assign(): + """Test bulk assign operation.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Use dedicated test case ID + case_ids = [7418669] + + try: + result = chronicle.execute_bulk_assign(case_ids, "@Administrator") + assert isinstance(result, dict) + print("Bulk assign: SUCCESS") + except APIError as e: + error_msg = str(e) + # Skip if API returns INTERNAL/500 error + if "INTERNAL" in error_msg or "500" in error_msg: + pytest.skip(f"Bulk assign API returned INTERNAL error: {e}") + # Re-raise other errors + raise + + +@pytest.mark.integration +def test_bulk_close_reopen_workflow(): + """Test bulk close and reopen workflow. + + This test closes cases and then reopens them. + """ + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Use dedicated test case ID + case_ids = [7418669] + + print(f"\nTesting close/reopen workflow on cases: {case_ids}") + + try: + # Close the cases + close_result = chronicle.execute_bulk_close( + case_ids=case_ids, + close_reason="MAINTENANCE", + root_cause="Integration test - closing for testing", + ) + assert isinstance(close_result, dict) + print("Bulk close: SUCCESS") + + # Verify cases are closed by fetching one + if case_ids: + case = chronicle.get_case(str(case_ids[0])) + assert case.status == "CLOSED" + print(f"Verified case {case_ids[0]} is CLOSED") + + finally: + # Cleanup: Reopen the cases + try: + reopen_result = chronicle.execute_bulk_reopen( + case_ids, "Integration test - reopening after test" + ) + assert isinstance(reopen_result, dict) + print("Bulk reopen (cleanup): SUCCESS") + + # Verify case is reopened + if case_ids: + case = chronicle.get_case(str(case_ids[0])) + assert case.status == "OPENED" + print(f"Verified case {case_ids[0]} is OPENED") + + except Exception as e: + print(f"Cleanup warning: Failed to reopen cases - {e}") diff --git a/tests/chronicle/test_client.py b/tests/chronicle/test_client.py index e9cb648..d8ba100 100644 --- a/tests/chronicle/test_client.py +++ b/tests/chronicle/test_client.py @@ -344,14 +344,14 @@ def test_get_cases(chronicle_client): } with patch.object( - chronicle_client.session, "get", return_value=mock_response - ) as mock_get: + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: result = chronicle_client.get_cases(["case-123"]) # Verify the correct endpoint was called - mock_get.assert_called_once() - call_args = mock_get.call_args - assert "legacy:legacyBatchGetCases" in call_args[0][0] + mock_request.assert_called_once() + call_args = mock_request.call_args + assert "legacy:legacyBatchGetCases" in call_args[1]["url"] # Verify the correct parameter name was used assert call_args[1]["params"] == {"names": ["case-123"]} @@ -384,7 +384,9 @@ def test_get_cases_filtering(chronicle_client): ] } - with patch.object(chronicle_client.session, "get", return_value=mock_response): + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ): result = chronicle_client.get_cases(["case-1", "case-2"]) high_priority = result.filter_by_priority("PRIORITY_HIGH") @@ -401,8 +403,11 @@ def test_get_cases_error(chronicle_client): mock_response = Mock() mock_response.status_code = 400 mock_response.text = "Invalid request" + mock_response.json.return_value = {"error": "Invalid request"} - with patch.object(chronicle_client.session, "get", return_value=mock_response): + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ): with pytest.raises(APIError, match="Failed to get cases"): chronicle_client.get_cases(["invalid-id"]) diff --git a/tests/cli/test_case_integration.py b/tests/cli/test_case_integration.py new file mode 100644 index 0000000..7c162ca --- /dev/null +++ b/tests/cli/test_case_integration.py @@ -0,0 +1,327 @@ +"""Integration tests for the SecOps CLI case commands. + +These tests require valid credentials and API access. +They interact with real Chronicle API endpoints via CLI. +""" + +import json +import subprocess + +import pytest + + +@pytest.mark.integration +def test_cli_list_and_get_cases_workflow(cli_env, common_args): + """Test CLI case list and get workflow.""" + # Test basic list + list_cmd = ["secops"] + common_args + ["case", "list", "--page-size", "3"] + list_result = subprocess.run( + list_cmd, env=cli_env, capture_output=True, text=True + ) + assert list_result.returncode == 0 + + try: + list_output = json.loads(list_result.stdout) + assert "cases" in list_output + assert isinstance(list_output["cases"], list) + assert "totalSize" in list_output + except json.JSONDecodeError: + assert "Error:" not in list_result.stdout + + # Test list with --as-list flag + as_list_cmd = ( + ["secops"] + + common_args + + ["case", "list", "--page-size", "3", "--as-list"] + ) + as_list_result = subprocess.run( + as_list_cmd, env=cli_env, capture_output=True, text=True + ) + assert as_list_result.returncode == 0 + + try: + as_list_output = json.loads(as_list_result.stdout) + assert isinstance(as_list_output, list) + if as_list_output: + assert "name" in as_list_output[0] + except json.JSONDecodeError: + assert "Error:" not in as_list_result.stdout + + # Test list with filter + filter_cmd = ( + ["secops"] + + common_args + + ["case", "list", "--page-size", "5", "--filter", 'status = "OPENED"'] + ) + filter_result = subprocess.run( + filter_cmd, env=cli_env, capture_output=True, text=True + ) + assert filter_result.returncode == 0 + + try: + filter_output = json.loads(filter_result.stdout) + assert "cases" in filter_output + except json.JSONDecodeError: + assert "Error:" not in filter_result.stdout + + # Test get case by ID + try: + if list_output.get("cases"): + case_name = list_output["cases"][0].get("name", "") + case_id = case_name.split("/")[-1] + + if case_id: + get_cmd = ( + ["secops"] + common_args + ["case", "get", "--id", case_id] + ) + get_result = subprocess.run( + get_cmd, env=cli_env, capture_output=True, text=True + ) + + assert get_result.returncode == 0 + + get_output = json.loads(get_result.stdout) + assert "id" in get_output or "display_name" in get_output + assert "priority" in get_output + assert "status" in get_output + except (json.JSONDecodeError, KeyError): + pass + + +@pytest.mark.integration +def test_cli_case_update(cli_env, common_args): + """Test the case update command.""" + # Use dedicated test case ID + case_id = "7418669" + + # Get original case state + get_cmd = ["secops"] + common_args + ["case", "get", "--id", case_id] + get_result = subprocess.run( + get_cmd, env=cli_env, capture_output=True, text=True + ) + + if get_result.returncode != 0: + pytest.skip("Unable to get test case") + + try: + get_output = json.loads(get_result.stdout) + original_priority = get_output.get("priority", "PRIORITY_MEDIUM") + + # Determine new priority + new_priority = ( + "PRIORITY_MEDIUM" + if original_priority == "PRIORITY_HIGH" + else "PRIORITY_HIGH" + ) + + # Update the case + update_cmd = ( + ["secops"] + + common_args + + [ + "case", + "update", + "--id", + case_id, + "--data", + f'{{"priority": "{new_priority}"}}', + "--update-mask", + "priority", + ] + ) + + update_result = subprocess.run( + update_cmd, env=cli_env, capture_output=True, text=True + ) + + assert update_result.returncode == 0 + + update_output = json.loads(update_result.stdout) + assert update_output.get("priority") == new_priority + + # Cleanup: Restore original priority + restore_cmd = ( + ["secops"] + + common_args + + [ + "case", + "update", + "--id", + case_id, + "--data", + f'{{"priority": "{original_priority}"}}', + "--update-mask", + "priority", + ] + ) + subprocess.run(restore_cmd, env=cli_env, capture_output=True) + + except (json.JSONDecodeError, KeyError): + pytest.skip("Unable to parse JSON output or extract data") + + +@pytest.mark.integration +def test_cli_case_bulk_add_tag(cli_env, common_args): + """Test the case bulk-add-tag command.""" + # Use dedicated test case ID + case_ids = ["7418669"] + + # Test bulk add tag + bulk_cmd = ( + ["secops"] + + common_args + + [ + "case", + "bulk-add-tag", + "--ids", + ",".join(case_ids), + "--tags", + "cli-integration-test", + ] + ) + + bulk_result = subprocess.run( + bulk_cmd, env=cli_env, capture_output=True, text=True + ) + + assert bulk_result.returncode == 0 + + +@pytest.mark.integration +def test_cli_case_bulk_assign(cli_env, common_args): + """Test the case bulk-assign command. + + Skips test if API returns 500/INTERNAL error. + """ + # Use dedicated test case ID + case_ids = ["7418669"] + + bulk_cmd = ( + ["secops"] + + common_args + + [ + "case", + "bulk-assign", + "--ids", + ",".join(case_ids), + "--username", + "'@Administrator'", + ] + ) + + bulk_result = subprocess.run( + bulk_cmd, env=cli_env, capture_output=True, text=True + ) + + # Skip if API returns INTERNAL/500 error + if bulk_result.returncode != 0: + if "INTERNAL" in bulk_result.stderr or "500" in bulk_result.stderr: + pytest.skip( + f"Bulk assign API returned INTERNAL error: " + f"{bulk_result.stderr}" + ) + + assert bulk_result.returncode == 0 + + +@pytest.mark.integration +def test_cli_case_bulk_change_priority(cli_env, common_args): + """Test the case bulk-change-priority command.""" + # Use dedicated test case ID + case_ids = ["7418669"] + + bulk_cmd = ( + ["secops"] + + common_args + + [ + "case", + "bulk-change-priority", + "--ids", + ",".join(case_ids), + "--priority", + "MEDIUM", + ] + ) + + bulk_result = subprocess.run( + bulk_cmd, env=cli_env, capture_output=True, text=True + ) + + assert bulk_result.returncode == 0 + + +@pytest.mark.integration +def test_cli_case_bulk_change_stage(cli_env, common_args): + """Test the case bulk-change-stage command.""" + # Use dedicated test case ID + case_ids = ["7418669"] + + bulk_cmd = ( + ["secops"] + + common_args + + [ + "case", + "bulk-change-stage", + "--ids", + ",".join(case_ids), + "--stage", + "Triage", + ] + ) + + bulk_result = subprocess.run( + bulk_cmd, env=cli_env, capture_output=True, text=True + ) + + assert bulk_result.returncode == 0 + + +@pytest.mark.integration +def test_cli_case_bulk_close_reopen_workflow(cli_env, common_args): + """Test the case bulk-close and bulk-reopen commands in workflow.""" + # Use dedicated test case ID + case_ids = ["7418669"] + + try: + # Test bulk close + close_cmd = ( + ["secops"] + + common_args + + [ + "case", + "bulk-close", + "--ids", + ",".join(case_ids), + "--close-reason", + "MAINTENANCE", + "--root-cause", + "CLI integration test", + ] + ) + + close_result = subprocess.run( + close_cmd, env=cli_env, capture_output=True, text=True + ) + + assert close_result.returncode == 0 + + finally: + # Cleanup: Test bulk reopen + reopen_cmd = ( + ["secops"] + + common_args + + [ + "case", + "bulk-reopen", + "--ids", + ",".join(case_ids), + "--reopen-comment", + "CLI integration test cleanup", + ] + ) + + reopen_result = subprocess.run( + reopen_cmd, env=cli_env, capture_output=True, text=True + ) + + assert reopen_result.returncode == 0