From 3ba8f0f9783ed40df6d737170bd871f55fbd1ccf Mon Sep 17 00:00:00 2001 From: PaperMtn Date: Wed, 4 Feb 2026 21:46:53 +0000 Subject: [PATCH 1/6] feat: migrate to helper function --- src/secops/chronicle/search.py | 71 +++++++++++----------------------- 1 file changed, 22 insertions(+), 49 deletions(-) diff --git a/src/secops/chronicle/search.py b/src/secops/chronicle/search.py index 037a6e5..bb7f66c 100644 --- a/src/secops/chronicle/search.py +++ b/src/secops/chronicle/search.py @@ -15,15 +15,19 @@ """UDM search functionality for Chronicle.""" from datetime import datetime -from typing import Any +from typing import Any, TYPE_CHECKING -import requests +from secops.chronicle.models import APIVersion +from secops.chronicle.utils.request_utils import ( + chronicle_request, +) -from secops.exceptions import APIError +if TYPE_CHECKING: + from secops.chronicle.client import ChronicleClient def search_udm( - client, + client: "ChronicleClient", query: str, start_time: datetime, end_time: datetime, @@ -53,15 +57,8 @@ def search_udm( Raises: APIError: If the API request fails """ - # Unused parameters, kept for backward compatibility - _ = (case_insensitive, max_attempts) - - # Format the instance ID for the API call - instance = client.instance_id - - # Endpoint for UDM search - url = f"{client.base_url}/{instance}:udmSearch" + _ = (case_insensitive, max_attempts, timeout) # Format times for the API start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") @@ -79,40 +76,16 @@ def search_udm( print(f"Executing UDM search: {query}") print(f"Time range: {start_time_str} to {end_time_str}") - try: - response = client.session.get(url, params=params, timeout=timeout) - - if response.status_code != 200: - error_msg = ( - f"Error executing search: Status {response.status_code}, " - f"Response: {response.text}" - ) - if debug: - print(f"Error: {error_msg}") - raise APIError(error_msg) - - # Parse the response - response_data = response.json() - - # Extract events and metadata - events = response_data.get("events", []) - more_data_available = response_data.get("moreDataAvailable", False) - - if debug: - print(f"Found {len(events)} events") - print(f"More data available: {more_data_available}") - - # Build the result structure to match the expected format - result = { - "events": events, - "total_events": len(events), - "more_data_available": more_data_available, - } - - return result - - except requests.exceptions.RequestException as e: - error_msg = f"Request failed: {str(e)}" - if debug: - print(f"Error: {error_msg}") - raise APIError(error_msg) from e + result = chronicle_request( + client, + method="GET", + endpoint_path=":udmSearch", + api_version=APIVersion.V1ALPHA, + params=params, + ) + + return { + "events": result.get("events", []), + "total_events": len(result.get("events", [])), + "more_data_available": result.get("moreDataAvailable", False), + } From 86b2ace2e49638f77110a8a637bc8e295fffb9e6 Mon Sep 17 00:00:00 2001 From: PaperMtn Date: Wed, 4 Feb 2026 21:47:29 +0000 Subject: [PATCH 2/6] feat: implement testing for search.py --- tests/chronicle/test_client.py | 35 ---------------------------------- 1 file changed, 35 deletions(-) diff --git a/tests/chronicle/test_client.py b/tests/chronicle/test_client.py index e9cb648..f826590 100644 --- a/tests/chronicle/test_client.py +++ b/tests/chronicle/test_client.py @@ -90,41 +90,6 @@ def test_chronicle_client_custom_session_user_agent(): assert client.session.headers.get("User-Agent") == "secops-wrapper-sdk" -def test_search_udm(chronicle_client): - """Test UDM search functionality.""" - # Mock the search request - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "events": [ - { - "name": "projects/test-project/locations/us/instances/test-instance/events/event1", - "udm": { - "metadata": { - "eventTimestamp": "2024-01-01T00:00:00Z", - "eventType": "NETWORK_CONNECTION", - }, - "target": {"ip": "192.168.1.1", "hostname": "test-host"}, - }, - } - ], - "moreDataAvailable": False, - } - - with patch.object(chronicle_client.session, "get", return_value=mock_response): - result = chronicle_client.search_udm( - query='target.ip != ""', - start_time=datetime(2024, 1, 1, tzinfo=timezone.utc), - end_time=datetime(2024, 1, 2, tzinfo=timezone.utc), - max_events=10, - ) - - assert "events" in result - assert "total_events" in result - assert result["total_events"] == 1 - assert result["events"][0]["udm"]["target"]["ip"] == "192.168.1.1" - - @patch("secops.chronicle.entity._detect_value_type_for_query") @patch("secops.chronicle.entity._summarize_entity_by_id") def test_summarize_entity_ip(mock_summarize_by_id, mock_detect, chronicle_client): From 51b66675ee7f3e99dbde4ee9db5730d9ed94554d Mon Sep 17 00:00:00 2001 From: PaperMtn Date: Wed, 4 Feb 2026 21:47:40 +0000 Subject: [PATCH 3/6] feat: implement testing for search.py --- tests/chronicle/test_search.py | 107 +++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 tests/chronicle/test_search.py diff --git a/tests/chronicle/test_search.py b/tests/chronicle/test_search.py new file mode 100644 index 0000000..0fe7c20 --- /dev/null +++ b/tests/chronicle/test_search.py @@ -0,0 +1,107 @@ +"""Tests for Chronicle UDM search functionality (search_udm).""" + +from __future__ import annotations + +import unittest +from datetime import datetime, timedelta, timezone +from unittest import mock + +from secops.chronicle.models import APIVersion +from secops.chronicle.search import search_udm + + +class TestChronicleUdmSearch(unittest.TestCase): + """Tests for Chronicle search functionality.""" + + def setUp(self) -> None: + self.client = mock.MagicMock() + self.start_time = datetime.now(tz=timezone.utc) - timedelta(days=1) + self.end_time = datetime.now(tz=timezone.utc) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_returns_expected_shape(self, mock_chronicle_request: mock.MagicMock) -> None: + mock_chronicle_request.return_value = { + "events": [{"id": 1}, {"id": 2}], + "moreDataAvailable": True, + } + + result = search_udm( + client=self.client, + query='metadata.event_type = "NETWORK_CONNECTION"', + start_time=self.start_time, + end_time=self.end_time, + max_events=500, + ) + + self.assertEqual(result["events"], [{"id": 1}, {"id": 2}]) + self.assertEqual(result["total_events"], 2) + self.assertTrue(result["more_data_available"]) + + mock_chronicle_request.assert_called_once() + _, kwargs = mock_chronicle_request.call_args + + self.assertEqual(kwargs["method"], "GET") + self.assertEqual(kwargs["endpoint_path"], ":udmSearch") + self.assertEqual(kwargs["api_version"], APIVersion.V1ALPHA) + + params = kwargs["params"] + self.assertEqual(params["query"], 'metadata.event_type = "NETWORK_CONNECTION"') + self.assertEqual(params["limit"], 500) + self.assertEqual( + params["timeRange.start_time"], + self.start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + ) + self.assertEqual( + params["timeRange.end_time"], + self.end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + ) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_defaults_when_keys_missing(self, mock_chronicle_request: mock.MagicMock) -> None: + # Missing "events" and "moreDataAvailable" should default safely + mock_chronicle_request.return_value = {} + + result = search_udm( + client=self.client, + query="q", + start_time=self.start_time, + end_time=self.end_time, + ) + + self.assertEqual(result["events"], []) + self.assertEqual(result["total_events"], 0) + self.assertFalse(result["more_data_available"]) + + @mock.patch("secops.chronicle.search.chronicle_request") + @mock.patch("builtins.print") + def test_search_udm_debug_prints(self, mock_print: mock.MagicMock, mock_chronicle_request: mock.MagicMock) -> None: + mock_chronicle_request.return_value = {"events": []} + + search_udm( + client=self.client, + query="q", + start_time=self.start_time, + end_time=self.end_time, + debug=True, + ) + + # Two prints: query + time range + self.assertGreaterEqual(mock_print.call_count, 2) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_propagates_api_error(self, mock_chronicle_request: mock.MagicMock) -> None: + mock_chronicle_request.side_effect = Exception("boom") + + with self.assertRaises(Exception) as ctx: + search_udm( + client=self.client, + query="q", + start_time=self.start_time, + end_time=self.end_time, + ) + + self.assertIn("boom", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main() From c619211970b9d3c88cb602516a64d29ca2cb3c45 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Fri, 6 Feb 2026 16:06:37 +0530 Subject: [PATCH 4/6] feat: add timeout parameter support to chronicle_request and improve stats error handling --- src/secops/chronicle/stats.py | 7 +++- src/secops/chronicle/utils/request_utils.py | 9 +++++- tests/chronicle/test_stats.py | 36 +++++++++++++++++++++ tests/chronicle/utils/test_request_utils.py | 1 + 4 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/secops/chronicle/stats.py b/src/secops/chronicle/stats.py index 4defdfb..8510ef3 100644 --- a/src/secops/chronicle/stats.py +++ b/src/secops/chronicle/stats.py @@ -20,6 +20,7 @@ from secops.chronicle.utils.request_utils import ( chronicle_request, ) +from secops.exceptions import APIError if TYPE_CHECKING: from secops.chronicle.client import ChronicleClient @@ -60,7 +61,7 @@ def get_stats( APIError: If the API request fails """ # Unused parameters, kept for backward compatibility - _ = (max_events, case_insensitive, max_attempts, timeout) + _ = (max_events, case_insensitive, max_attempts) # Query parameters for the API call params = { @@ -76,8 +77,12 @@ def get_stats( endpoint_path=":udmSearch", api_version=APIVersion.V1ALPHA, params=params, + timeout=timeout, ) + if "stats" not in results: + raise APIError("No stats found in response") + # Process the stats results return process_stats_results(results["stats"]) diff --git a/src/secops/chronicle/utils/request_utils.py b/src/secops/chronicle/utils/request_utils.py index caf8d63..43f2d88 100644 --- a/src/secops/chronicle/utils/request_utils.py +++ b/src/secops/chronicle/utils/request_utils.py @@ -198,6 +198,7 @@ def chronicle_request( json: dict[str, Any] | None = None, expected_status: int | set[int] | tuple[int, ...] | list[int] = 200, error_message: str | None = None, + timeout: int | None = None, ) -> dict[str, Any] | list[Any]: """Perform an HTTP request and return JSON, raising APIError on failure. @@ -216,6 +217,7 @@ def chronicle_request( (e.g. 200) or an iterable of acceptable status codes (e.g. {200, 204}). If the response status is not acceptable, an APIError is raised. error_message: Optional base error message to include on failure + timeout: Optional timeout in seconds for the request Returns: Parsed JSON response. @@ -237,7 +239,12 @@ def chronicle_request( try: response = client.session.request( - method=method, url=url, params=params, json=json, headers=headers + method=method, + url=url, + params=params, + json=json, + headers=headers, + timeout=timeout, ) except GoogleAuthError as exc: base_msg = error_message or "Google authentication failed" diff --git a/tests/chronicle/test_stats.py b/tests/chronicle/test_stats.py index eb54f55..c5f803f 100644 --- a/tests/chronicle/test_stats.py +++ b/tests/chronicle/test_stats.py @@ -6,6 +6,7 @@ from secops.chronicle.stats import get_stats, process_stats_results from secops.chronicle.models import APIVersion +from secops.exceptions import APIError class TestChronicleStats(unittest.TestCase): @@ -157,6 +158,41 @@ def test_get_stats_timestamp_in_list(self, mock_chronicle_request: mock.MagicMoc self.assertIsInstance(result["rows"][0]["timestamp_array"][0], datetime) self.assertIsInstance(result["rows"][0]["timestamp_array"][1], datetime) + @mock.patch("secops.chronicle.stats.chronicle_request") + def test_get_stats_malformed_response_missing_stats_key( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test that get_stats raises APIError when response missing stats.""" + mock_chronicle_request.return_value = { + "someOtherKey": "value", + "events": [], + } + + with self.assertRaises(APIError) as context: + get_stats( + self.mock_client, "test query", self.start_time, self.end_time + ) + + self.assertIn("No stats found in response", str(context.exception)) + + @mock.patch("secops.chronicle.stats.chronicle_request") + def test_get_stats_api_error_propagation( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test that APIError from chronicle_request propagates correctly.""" + mock_chronicle_request.side_effect = APIError( + "API request failed: method=GET, url=test-url, " + "status=500, response={'error': 'Internal server error'}" + ) + + with self.assertRaises(APIError) as context: + get_stats( + self.mock_client, "test query", self.start_time, self.end_time + ) + + self.assertIn("API request failed", str(context.exception)) + self.assertIn("status=500", str(context.exception)) + def test_process_stats_results_empty(self) -> None: result = process_stats_results({}) self.assertEqual(result["total_rows"], 0) diff --git a/tests/chronicle/utils/test_request_utils.py b/tests/chronicle/utils/test_request_utils.py index 4c65620..6f8687a 100644 --- a/tests/chronicle/utils/test_request_utils.py +++ b/tests/chronicle/utils/test_request_utils.py @@ -87,6 +87,7 @@ def test_chronicle_request_success_json(client: Mock) -> None: params={"pageSize": 10}, json=None, headers=None, + timeout=None, ) From de65ab96d36ec6583d3ccc659b736a8ea17d88db Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Mon, 9 Feb 2026 11:38:30 +0530 Subject: [PATCH 5/6] chore: bump version to 0.34.3 and update changelog --- CHANGELOG.md | 5 +++++ pyproject.toml | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e756380..3d41524 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.34.3] - 2026-02-09 +### Updated +- `get_stats()` to use `chronicle_request` helper for improved error handling and consistency +- `chronicle_request` helper function to support `timeout` parameter + ## [0.34.2] - 2026-02-03 ### Fixed - `ingest_log()` method to correctly skip log type validation when `force_log_type=True`, preventing unnecessary API calls diff --git a/pyproject.toml b/pyproject.toml index f8ac4b5..0ad6926 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.34.2" +version = "0.34.3" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.10" From 1426dd74ea1737ef2bc440b271ac4dcccb2b89ab Mon Sep 17 00:00:00 2001 From: PaperMtn Date: Mon, 9 Feb 2026 20:28:58 +0000 Subject: [PATCH 6/6] feat: implement as_list and timeout --- src/secops/chronicle/client.py | 2 ++ src/secops/chronicle/search.py | 7 ++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index b32b6ae..9e2b588 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -872,6 +872,7 @@ def search_udm( max_attempts: int = 30, timeout: int = 30, debug: bool = False, + as_list: bool = False, ) -> dict[str, Any]: """Search UDM events in Chronicle. @@ -905,6 +906,7 @@ def search_udm( max_attempts, timeout, debug, + as_list, ) def find_udm_field_values( diff --git a/src/secops/chronicle/search.py b/src/secops/chronicle/search.py index bb7f66c..c27f30b 100644 --- a/src/secops/chronicle/search.py +++ b/src/secops/chronicle/search.py @@ -36,7 +36,8 @@ def search_udm( max_attempts: int = 30, timeout: int = 30, debug: bool = False, -) -> dict[str, Any]: + as_list: bool = False, +) -> dict[str, Any] | list[dict[str, Any]]: """Perform a UDM search query using the Chronicle V1alpha API. Args: @@ -50,6 +51,7 @@ def search_udm( for backwards compatibility) timeout: Timeout in seconds for each API request (default: 30) debug: Print debug information during execution + as_list: Whether to return results as a list or dictionary Returns: Dict containing the search results with events @@ -82,8 +84,11 @@ def search_udm( endpoint_path=":udmSearch", api_version=APIVersion.V1ALPHA, params=params, + timeout=timeout, ) + if as_list: + return result.get("events", []) return { "events": result.get("events", []), "total_events": len(result.get("events", [])),