Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions datadog_sync/model/monitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,23 @@ def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optiona
else:
# Use default connect_id method in base class when not handling special case for `query`
return super(Monitors, self).connect_id(key, r_obj, resource_to_connect)

def extract_source_ids(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]:
# Mirror of connect_id -- keep in sync when connect_id changes.
if key == "query" and r_obj.get("type") == "composite" and resource_to_connect != "service_level_objectives":
return re.findall("[0-9]+", r_obj[key])
elif key == "query" and resource_to_connect == "service_level_objectives" and r_obj.get("type") == "slo alert":
if res := re.search(r'(?:error_budget|burn_rate)\("(.*?)"\)\.', r_obj[key]):
return [res.group(1)]
return []
elif key == "query":
return []
elif key == "principals":
type_map = {"user": "users", "role": "roles", "team": "teams"}
return [
_id
for p in r_obj[key]
for _type, _id in [p.split(":", 1)]
if type_map.get(_type) == resource_to_connect
]
return super(Monitors, self).extract_source_ids(key, r_obj, resource_to_connect)
16 changes: 16 additions & 0 deletions datadog_sync/model/restriction_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,19 @@ def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optiona
failed_connections.append(_id)

return failed_connections

def extract_source_ids(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]:
# Mirror of connect_id -- keep in sync when connect_id changes.
if key == "id":
_type, _id = r_obj[key].split(":", 1)
type_map = {"dashboard": "dashboards", "slo": "service_level_objectives", "notebook": "notebooks"}
return [_id] if type_map.get(_type) == resource_to_connect else []
elif key == "principals":
type_map = {"user": "users", "role": "roles", "team": "teams"}
return [
_id
for p in r_obj[key]
for _type, _id in [p.split(":", 1)]
if type_map.get(_type) == resource_to_connect
]
return super().extract_source_ids(key, r_obj, resource_to_connect)
14 changes: 14 additions & 0 deletions datadog_sync/model/service_level_objectives.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,17 @@ def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optiona
if not found:
failed_connections.append(_id)
return failed_connections

def extract_source_ids(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]:
# Mirror of connect_id -- keep in sync when connect_id changes.
# connect_id checks each monitor_id against monitors destination state first,
# then falls back to synthetics_tests using suffix match on composite keys
# ('{public_id}#{monitor_id}'). For source discovery, exclude IDs that are
# synthetics monitor IDs to prevent false ("monitors", id) misses.
if key != "monitor_ids":
return super().extract_source_ids(key, r_obj, resource_to_connect)
ids = [str(obj) for obj in r_obj[key]]
if resource_to_connect == "monitors":
synthetics = self.config.state.source["synthetics_tests"]
return [_id for _id in ids if not any(k.endswith("#" + _id) for k in synthetics)]
return super().extract_source_ids(key, r_obj, resource_to_connect)
50 changes: 43 additions & 7 deletions datadog_sync/model/synthetics_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SyntheticsTests(BaseResource):
"rum_applications": ["options.rumSettings.applicationId"],
"synthetics_mobile_applications": [
"options.mobileApplication.applicationId",
"options.mobileApplication.referenceId",
],
"synthetics_mobile_applications_versions": [
"mobileApplicationsVersions",
Expand Down Expand Up @@ -90,7 +91,10 @@ class SyntheticsTests(BaseResource):
network_base_path: str = "/api/v2/synthetics/tests/network"
network_delete_path: str = "/api/v2/synthetics/tests/bulk-delete"
get_params = {"include_metadata": "true"}
versions: List = []

def __init__(self, config):
super().__init__(config)
self.versions: Optional[List[Dict]] = None

@staticmethod
def _unwrap_network_response(resp: Dict) -> Dict:
Expand Down Expand Up @@ -144,13 +148,18 @@ async def _delete_test(self, client: CustomClient, test_type: str, public_id: st
body = {"public_ids": [public_id]}
await client.post(self.resource_config.base_path + "/delete", body)

async def _ensure_mobile_versions_loaded(self, client: CustomClient) -> List[Dict]:
if self.versions is None:
versions_resource = SyntheticsMobileApplicationsVersions(self.config)
self.versions = await versions_resource.get_resources(client)
return self.versions

async def get_resources(self, client: CustomClient) -> List[Dict]:
resp = await client.get(
self.resource_config.base_path,
params=self.get_params,
)
versions = SyntheticsMobileApplicationsVersions(self.config)
self.versions = await versions.get_resources(client)
await self._ensure_mobile_versions_loaded(client)
return resp["tests"]

async def import_resource(self, _id: Optional[str] = None, resource: Optional[Dict] = None) -> Tuple[str, Dict]:
Expand Down Expand Up @@ -197,9 +206,10 @@ async def import_resource(self, _id: Optional[str] = None, resource: Optional[Di
self.mobile_test_path.format(_id),
params=self.get_params,
)
mobile_versions = await self._ensure_mobile_versions_loaded(source_client)
versions = [
i["id"]
for i in self.versions
for i in mobile_versions
if i["application_id"] == resource["options"]["mobileApplication"]["applicationId"]
]
resource["mobileApplicationsVersions"] = list(set(versions))
Expand Down Expand Up @@ -407,11 +417,37 @@ def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optiona
else:
failed_connections.append(_id)
return failed_connections
elif resource_to_connect == "synthetics_mobile_applications" and key == "referenceId":
# referenceId is an application ID only when referenceType is "latest".
if r_obj.get("referenceType") == "latest":
return super(SyntheticsTests, self).connect_id(key, r_obj, resource_to_connect)
return []
elif resource_to_connect == "synthetics_mobile_applications_versions" and key == "referenceId":
# When referenceType is "latest", referenceId contains the application ID, not a version ID.
# Connect it against synthetics_mobile_applications instead.
# referenceId is a version ID only when referenceType is not "latest".
if r_obj.get("referenceType") == "latest":
return super(SyntheticsTests, self).connect_id(key, r_obj, "synthetics_mobile_applications")
return []
return super(SyntheticsTests, self).connect_id(key, r_obj, resource_to_connect)
else:
return super(SyntheticsTests, self).connect_id(key, r_obj, resource_to_connect)

def extract_source_ids(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]:
# Mirror of connect_id -- keep in sync when connect_id changes.
# Only synthetics_private_locations and mobile application versions need special handling.
# rum_applications, synthetics_tests (subtests), synthetics_global_variables, roles, and
# synthetics_mobile_applications (applicationId key) all use plain IDs at the leaf —
# base extract_source_ids handles them. For synthetics_tests subtests, _dep_in_source_state
# handles composite key prefix matching ('{public_id}#{monitor_id}' keys).
if resource_to_connect == "synthetics_private_locations":
pl = self.config.resources["synthetics_private_locations"]
return [str(_id) for _id in r_obj[key] if pl.pl_id_regex.match(str(_id))]
elif resource_to_connect == "synthetics_mobile_applications" and key == "referenceId":
# referenceId is an application ID only when referenceType is "latest".
if r_obj.get("referenceType") == "latest":
return super(SyntheticsTests, self).extract_source_ids(key, r_obj, resource_to_connect)
return []
elif resource_to_connect == "synthetics_mobile_applications_versions" and key == "referenceId":
# referenceId is a version ID only when referenceType is not "latest".
if r_obj.get("referenceType") == "latest":
return []
return super(SyntheticsTests, self).extract_source_ids(key, r_obj, resource_to_connect)
return super(SyntheticsTests, self).extract_source_ids(key, r_obj, resource_to_connect)
8 changes: 8 additions & 0 deletions datadog_sync/model/team_memberships.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,11 @@ def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optiona
else:
failed_connections.append(_id)
return failed_connections

def extract_source_ids(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]:
# Mirror of connect_id -- keep in sync when connect_id changes.
_type = r_obj["type"]
type_map = {"users": "users", "team": "teams"}
if type_map.get(_type) == resource_to_connect:
return [r_obj["id"]]
return []
14 changes: 14 additions & 0 deletions datadog_sync/utils/base_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,20 @@ def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optiona

return failed_connections

def extract_source_ids(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]:
"""Extract dependency IDs referenced at r_obj[key] for resource_to_connect.

Source-only: does NOT check destination state, does NOT mutate r_obj.
Override in subclasses with custom connect_id logic (regex, prefix
parsing, type dispatch, etc.).
Mirror of connect_id -- keep in sync when connect_id changes.
"""
if not r_obj.get(key):
return None
if isinstance(r_obj[key], list):
return [str(v) for v in r_obj[key]]
return [str(r_obj[key])]

def connect_resources(self, _id: str, resource: Dict) -> None:
if not self.resource_config.resource_connections:
return
Expand Down
166 changes: 166 additions & 0 deletions datadog_sync/utils/resources_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,19 @@ async def _diffs_worker_cb(self, q_item: List) -> None:

async def import_resources(self) -> None:
await self.import_resources_without_saving()

if self.config.force_missing_dependencies:
missing = self._discover_missing_dependencies()
if missing:
self.config.logger.info(f"importing {len(missing)} missing dependencies...")
await self.worker.init_workers(self._import_missing_dep_cb, None, len(missing))
for item in missing:
self.worker.work_queue.put_nowait(item)
await self.worker.schedule_workers()
self.config.logger.info("finished importing missing dependencies")
else:
self.config.logger.info("no missing dependencies found")

self.config.state.dump_state(Origin.SOURCE)

async def import_resources_without_saving(self) -> None:
Expand Down Expand Up @@ -738,6 +751,159 @@ def _log_stale_summary(self, stale: Dict[Tuple[Origin, str], Set[str]]) -> None:
for fn in sorted(filenames):
self.config.logger.debug(f" stale: {origin.value}/{fn}")

def _source_state_key(self, dep_type: str, dep_id: str) -> Optional[str]:
"""Resolve a dependency ID to its canonical source-state key.

Returns the actual key in config.state.source[dep_type], or None if
the dependency is not present in source state.

Handles composite keys: synthetics_tests uses '{public_id}#{monitor_id}'
keys in source state, so an exact match on a bare public_id fails.
Prefix matching resolves the canonical key and ensures the BFS queues
the correct key for subsequent _source_dependencies_for_resource lookups.
"""
source = self.config.state.source.get(dep_type, {})
if dep_id in source:
return dep_id
if dep_type == "synthetics_tests":
return next((k for k in source if k.startswith(dep_id + "#")), None)
return None

def _source_dependencies_for_resource(self, resource_type: str, _id: str) -> Set[Tuple[str, str]]:
"""Return all (dep_type, dep_id) referenced by a source resource.

Uses extract_source_ids — source-only, does NOT check destination state,
does NOT mutate the resource.

Does NOT call ensure_resource_loaded(). During import with
--force-missing-dependencies, resources are fetched from the API and
stored directly in source state. ensure_resource_loaded() remains in
_resource_connections() for the sync-time path only.
"""
deps: Set[Tuple[str, str]] = set()
r_class = self.config.resources[resource_type]
if not r_class.resource_config.resource_connections:
return deps
resource = deepcopy(self.config.state.source[resource_type][_id])
for dep_type, paths in r_class.resource_config.resource_connections.items():
for path in paths:
if not path:
continue # empty attr path (e.g., SLO's "synthetics_tests": [])
ids = find_attr(path, dep_type, resource, r_class.extract_source_ids)
for dep_id in ids or []:
deps.add((dep_type, dep_id))
return deps

def _discover_missing_dependencies(self) -> Set[Tuple[str, str]]:
"""Scan imported resources and all reachable transitive deps for
dependency IDs not yet in source state.

BFS closure walk: seeds from resources_arg nodes, follows edges through
already-present source resources so that transitive deps of present
resources are also discovered. Uses _dep_in_source_state() for composite
key handling (e.g., synthetics_tests '{public_id}#{monitor_id}' keys).

Uses extract_source_ids (source-only) rather than connect_id (which
checks destination state) to avoid stale destination state causing deps
to appear resolved when they haven't been fetched into source state.
"""
missing: Set[Tuple[str, str]] = set()
seen: Set[Tuple[str, str]] = set()
queue = [(rt, _id) for rt in self.config.resources_arg for _id in list(self.config.state.source[rt].keys())]
while queue:
rt, _id = queue.pop()
if (rt, _id) in seen:
continue
seen.add((rt, _id))
r_class = self.config.resources.get(rt)
if not r_class or not r_class.resource_config.resource_connections:
continue
for dep_type, dep_id in self._source_dependencies_for_resource(rt, _id):
source_key = self._source_state_key(dep_type, dep_id)
if source_key is not None:
queue.append((dep_type, source_key)) # canonical key — scan for ITS deps
else:
missing.add((dep_type, dep_id))
return missing

async def _import_missing_dep_cb(self, q_item: Tuple[str, str]) -> None:
"""Import a single missing dependency by ID, then recursively discover
further missing deps and enqueue them.

Import-time equivalent of _force_missing_dep_import_cb (sync-time).
See that method for the sync-time version. Key differences:
- Does NOT populate _dependency_graph (import has no sync graph)
- Only ensures resources land in source state

Metrics/counters are intentionally omitted, matching the sync-time
callback convention.

Circular dependency safety: _import_resource stores the resource in
config.state.source[resource_type][_id] (base_resource.py) BEFORE
this method returns. When _resource_connections discovers transitive deps,
the source-state check prevents re-enqueuing already-imported resources.

Race condition note: The dedup guard below is not atomic with the
subsequent _import_resource call. Two workers can pass the guard
simultaneously for the same dep. This is benign — _import_resource
is idempotent (fetches the same data and overwrites the same key).
The check is a best-effort optimization, not a correctness guarantee.
Same accepted risk exists in _force_missing_dep_import_cb.

Cancel callback note: Workers use cancel_cb=None (default: queue.empty).
There is a narrow window where the queue empties before a callback
enqueues transitive deps, causing premature shutdown. This risk is
slightly higher than in _force_missing_dep_import_cb (sync-time)
because during import, transitive deps from newly-imported resources
are MORE likely (sync-time already did a full scan via
get_dependency_graph). In practice this is acceptable because imports
are idempotent — a second run resolves any missed deps.
"""
resource_type, _id = q_item

# Guard: unknown resource type
if resource_type not in self.config.resources:
self.config.logger.warning(f"skipping unknown dependency type: {resource_type}", _id=_id)
return

# Skip if already imported (best-effort dedup + circular dep safety).
# Uses _source_state_key to handle composite keys (e.g., synthetics_tests
# '{public_id}#{monitor_id}') so a bare public_id deduplicates correctly.
if self._source_state_key(resource_type, _id) is not None:
return

try:
_id = await self.config.resources[resource_type]._import_resource(_id=_id)
self._emit(resource_type, _id, "import", "success")
except SkipResource as e:
self._emit(resource_type, _id, "import", "skipped", reason=self._sanitize_reason(e))
self.config.logger.info(f"skipping dependency: {str(e)}", resource_type=resource_type, _id=_id)
return
except CustomClientHTTPError as e:
self._emit(resource_type, _id, "import", "failure", reason=self._sanitize_reason(e))
self.config.logger.error(f"error importing dependency: {str(e)}", resource_type=resource_type, _id=_id)
return
except Exception as e:
self._emit(resource_type, _id, "import", "failure", reason=self._sanitize_reason(e))
self.config.logger.error(f"error importing dependency: {str(e)}", resource_type=resource_type, _id=_id)
return

# Recursively discover transitive deps from the newly imported resource.
# Uses _source_dependencies_for_resource (source-only) rather than
# _resource_connections (destination-state check) to avoid stale
# destination state masking missing deps.
try:
r_class = self.config.resources[resource_type]
if r_class.resource_config.resource_connections:
for dep_type, dep_id in self._source_dependencies_for_resource(resource_type, _id):
if self._source_state_key(dep_type, dep_id) is None:
self.worker.work_queue.put_nowait((dep_type, dep_id))
except Exception as e:
self.config.logger.error(
f"error discovering transitive deps: {str(e)}", resource_type=resource_type, _id=_id
)

# See also: _import_missing_dep_cb (import-time equivalent)
async def _force_missing_dep_import_cb(self, q_item: List):
resource_type, _id = q_item
try:
Expand Down
Loading
Loading