Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,23 @@ secops alert --snapshot-query "feedback_summary.status != \"CLOSED\"" --time-win
secops alert --baseline-query "detection.rule_name = \"My Rule\"" --time-window 24
```

### Rule Retrohunt Management

List all retrohunts for a rule
```bash
secops rule-retrohunt list --rule-id "ru_abcdef"
```

Create a new retrohunt for a rule
```bash
secops rule-retrohunt create --rule-id "ru_abcdef" --start-time "2026-01-01T00:00:00Z" --end-time "2026-01-02T00:00:00Z"
```

Get specific retrohunt details
```bash
secops rule-retrohunt get --rule-id "ru_abcdef" --operation-id "oh_abcdef"
```

### Rule Exclusions Management

Rule Exclusions allow you to exclude specific events from triggering detections in Chronicle. Use these commands to manage rule exclusions and their deployments:
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,10 @@ operation_id = retrohunt.get("name", "").split("/")[-1]

# Check retrohunt status
retrohunt_status = chronicle.get_retrohunt(rule_id, operation_id)
is_complete = retrohunt_status.get("metadata", {}).get("done", False)
state = retrohunt_status.get("state", "")

# List retrohunts for a rule
retrohunts = chronicle.list_retrohunts(rule_id)
```

### Detections and Errors
Expand Down
6 changes: 3 additions & 3 deletions api_module_mapping.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/
| rules.list | v1 | chronicle.rule.list_rules | secops rule list |
| rules.listRevisions | v1 | | |
| rules.patch | v1 | chronicle.rule.update_rule | secops rule update |
| rules.retrohunts.create | v1 | chronicle.rule_retrohunt.create_retrohunt | |
| rules.retrohunts.get | v1 | chronicle.rule_retrohunt.get_retrohunt | |
| rules.retrohunts.list | v1 | | |
| rules.retrohunts.create | v1 | chronicle.rule_retrohunt.create_retrohunt | secops rule-retrohunt create |
| rules.retrohunts.get | v1 | chronicle.rule_retrohunt.get_retrohunt | secops rule-retrohunt get |
| rules.retrohunts.list | v1 | chronicle.rule_retrohunt.list_retrohunts | secops rule-retrohunt list |
| rules.updateDeployment | v1 | chronicle.rule.enable_rule | secops rule enable |
| watchlists.create | v1 | chronicle.watchlist.create_watchlist | secops watchlist create |
| watchlists.delete | v1 | chronicle.watchlist.delete_watchlist | secops watchlist delete |
Expand Down
7 changes: 6 additions & 1 deletion src/secops/chronicle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@
patch_rule_exclusion,
update_rule_exclusion_deployment,
)
from secops.chronicle.rule_retrohunt import create_retrohunt, get_retrohunt
from secops.chronicle.rule_retrohunt import (
create_retrohunt,
get_retrohunt,
list_retrohunts,
)
from secops.chronicle.rule_set import (
batch_update_curated_rule_set_deployments,
get_curated_rule,
Expand Down Expand Up @@ -277,6 +281,7 @@
# Rule retrohunt operations
"create_retrohunt",
"get_retrohunt",
"list_retrohunts",
# Rule set operations
"batch_update_curated_rule_set_deployments",
"list_curated_rule_sets",
Expand Down
36 changes: 35 additions & 1 deletion src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,9 @@
)
from secops.chronicle.rule_retrohunt import (
create_retrohunt as _create_retrohunt,
get_retrohunt as _get_retrohunt,
list_retrohunts as _list_retrohunts,
)
from secops.chronicle.rule_retrohunt import get_retrohunt as _get_retrohunt
from secops.chronicle.rule_set import (
batch_update_curated_rule_set_deployments as _batch_update_curated_rule_set_deployments, # pylint: disable=line-too-long
)
Expand Down Expand Up @@ -2226,6 +2227,39 @@ def get_retrohunt(
"""
return _get_retrohunt(self, rule_id, operation_id, api_version)

def list_retrohunts(
self,
rule_id: str,
page_size: int | None = None,
page_token: str | None = None,
api_version: APIVersion | None = APIVersion.V1,
as_list: bool = False,
) -> dict[str, Any] | list[dict[str, Any]]:
"""Get a list of retrohunts for a rule.

Args:
rule_id: Unique ID of the rule to list retrohunts for
page_size: Page size to use for paginated results
page_token: Page token to use for paginated results
api_version: Preferred API version to use. Defaults to V1
as_list: Whether to return results as a list or dictionary

Returns:
If as_list is True: List of retrohunts.
If as_list is False: Dict with retrohunts list and nextPageToken.

Raises:
APIError: If the API request fails
"""
return _list_retrohunts(
self,
rule_id,
page_size,
page_token,
api_version,
as_list,
)

# Parser Management methods

def activate_parser(
Expand Down
79 changes: 57 additions & 22 deletions src/secops/chronicle/rule_retrohunt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
"""Retrohunt functionality for Chronicle rules."""

from datetime import datetime
from typing import Any
from typing import Any, TYPE_CHECKING

from secops.chronicle.models import APIVersion
from secops.exceptions import APIError
from secops.chronicle.utils.request_utils import (
chronicle_request,
chronicle_paginated_request,
)

if TYPE_CHECKING:
from secops.chronicle.client import ChronicleClient


def create_retrohunt(
client,
client: "ChronicleClient",
rule_id: str,
start_time: datetime,
end_time: datetime,
Expand All @@ -46,28 +52,25 @@ def create_retrohunt(
Raises:
APIError: If the API request fails
"""
url = (
f"{client.base_url(api_version, list(APIVersion))}/"
f"{client.instance_id}/rules/{rule_id}/retrohunts"
)

body = {
"process_interval": {
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
},
}

response = client.session.post(url, json=body)

if response.status_code != 200:
raise APIError(f"Failed to create retrohunt: {response.text}")

return response.json()
return chronicle_request(
client,
method="POST",
endpoint_path=f"rules/{rule_id}/retrohunts",
json=body,
api_version=api_version,
error_message="Failed to create retrohunt",
)


def get_retrohunt(
client,
client: "ChronicleClient",
rule_id: str,
operation_id: str,
api_version: APIVersion | None = APIVersion.V1,
Expand All @@ -87,14 +90,46 @@ def get_retrohunt(
Raises:
APIError: If the API request fails
"""
url = (
f"{client.base_url(api_version, list(APIVersion))}/"
f"{client.instance_id}/rules/{rule_id}/retrohunts/{operation_id}"
return chronicle_request(
client,
method="GET",
endpoint_path=f"rules/{rule_id}/retrohunts/{operation_id}",
api_version=api_version,
error_message="Failed to get retrohunt",
)

response = client.session.get(url)

if response.status_code != 200:
raise APIError(f"Failed to get retrohunt: {response.text}")
def list_retrohunts(
client: "ChronicleClient",
rule_id: str,
page_size: int | None = None,
page_token: str | None = None,
api_version: APIVersion | None = APIVersion.V1,
as_list: bool = False,
) -> dict[str, Any] | list[dict[str, Any]]:
"""Get a list of retrohunts for a rule.

Args:
client: ChronicleClient instance
rule_id: Unique ID of the rule to list retrohunts for
page_size: Page size to use for paginated results
page_token: Page token to use for paginated results
api_version: Preferred API version to use. Defaults to V1
as_list: Whether to return results as a list or dictionary

Returns:
If as_list is True: List of retrohunts.
If as_list is False: Dict with retrohunts list and nextPageToken.

return response.json()
Raises:
APIError: If the API request fails
"""
return chronicle_paginated_request(
client,
api_version=api_version,
path=f"rules/{rule_id}/retrohunts",
items_key="retrohunts",
page_size=page_size,
page_token=page_token,
as_list=as_list,
)
30 changes: 19 additions & 11 deletions src/secops/chronicle/rule_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@
#
"""Rule validation functionality for Chronicle."""

from typing import NamedTuple
from typing import NamedTuple, TYPE_CHECKING

from secops.exceptions import APIError
from secops.chronicle.models import APIVersion
from secops.chronicle.utils.request_utils import (
chronicle_request,
)

if TYPE_CHECKING:
from secops.chronicle.client import ChronicleClient


class ValidationResult(NamedTuple):
Expand All @@ -34,7 +40,9 @@ class ValidationResult(NamedTuple):
position: dict[str, int] | None = None


def validate_rule(client, rule_text: str) -> ValidationResult:
def validate_rule(
client: "ChronicleClient", rule_text: str
) -> ValidationResult:
"""Validates a YARA-L2 rule against the Chronicle API.

Args:
Expand All @@ -51,20 +59,20 @@ def validate_rule(client, rule_text: str) -> ValidationResult:
Raises:
APIError: If the API request fails
"""
url = f"{client.base_url}/{client.instance_id}:verifyRuleText"

# Clean up the rule text by removing leading/trailing backticks and
# whitespace
cleaned_rule = rule_text.strip("` \n\t\r")

body = {"ruleText": cleaned_rule}

response = client.session.post(url, json=body)

if response.status_code != 200:
raise APIError(f"Failed to validate rule: {response.text}")

result = response.json()
result = chronicle_request(
client,
method="POST",
endpoint_path=":verifyRuleText",
json=body,
api_version=APIVersion.V1ALPHA,
error_message="Failed to validate rule",
)

# Check if the response indicates success
if result.get("success", False):
Expand Down
1 change: 1 addition & 0 deletions src/secops/chronicle/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def get_stats(
api_version=APIVersion.V1ALPHA,
params=params,
timeout=timeout,
error_message="Failed to get stats",
)

if "stats" not in results:
Expand Down
2 changes: 2 additions & 0 deletions src/secops/cli/cli_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from secops.cli.commands.stats import setup_stats_command
from secops.cli.commands.udm_search import setup_udm_search_view_command
from secops.cli.commands.watchlist import setup_watchlist_command
from secops.cli.commands.rule_retrohunt import setup_rule_retrohunt_command
from secops.cli.utils.common_args import add_chronicle_args, add_common_args
from secops.cli.utils.config_utils import load_config
from secops.exceptions import AuthenticationError, SecOpsError
Expand Down Expand Up @@ -187,6 +188,7 @@ def build_parser() -> argparse.ArgumentParser:
setup_dashboard_command(subparsers)
setup_dashboard_query_command(subparsers)
setup_watchlist_command(subparsers)
setup_rule_retrohunt_command(subparsers)

return parser

Expand Down
Loading