Skip to content
1 change: 1 addition & 0 deletions src/sap_cloud_sdk/core/telemetry/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Module(str, Enum):
AUDITLOG = "auditlog"
DESTINATION = "destination"
OBJECTSTORE = "objectstore"
DMS = "dms"

def __str__(self) -> str:
return self.value
12 changes: 12 additions & 0 deletions src/sap_cloud_sdk/core/telemetry/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,17 @@ class Operation(str, Enum):
AICORE_SET_CONFIG = "set_aicore_config"
AICORE_AUTO_INSTRUMENT = "auto_instrument"


# DMS Operations
DMS_ONBOARD_REPOSITORY = "onboard_repository"
DMS_GET_REPOSITORY = "get_repository"
DMS_GET_ALL_REPOSITORIES = "get_all_repositories"
DMS_UPDATE_REPOSITORY = "update_repository"
DMS_DELETE_REPOSITORY = "delete_repository"
DMS_CREATE_CONFIG = "create_config"
DMS_GET_CONFIGS = "get_configs"
DMS_UPDATE_CONFIG = "update_config"
DMS_DELETE_CONFIG = "delete_config"

def __str__(self) -> str:
return self.value
19 changes: 19 additions & 0 deletions src/sap_cloud_sdk/dms/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Optional
from sap_cloud_sdk.dms.model import DMSCredentials
from sap_cloud_sdk.dms.client import DMSClient
from sap_cloud_sdk.dms.config import load_sdm_config_from_env_or_mount


def create_client(
*,
instance: Optional[str] = None,
dms_cred: Optional[DMSCredentials] = None
):
if dms_cred is not None:
return DMSClient(dms_cred)
if instance is not None:
return DMSClient(load_sdm_config_from_env_or_mount(instance))

Check failure on line 16 in src/sap_cloud_sdk/dms/__init__.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (W293)

src/sap_cloud_sdk/dms/__init__.py:16:1: W293 Blank line contains whitespace help: Remove whitespace from blank line
raise ValueError("No configuration provided. Please provide either instance name, config, or dms_cred.")

__all__ = ["create_client"]

Check failure on line 19 in src/sap_cloud_sdk/dms/__init__.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (W292)

src/sap_cloud_sdk/dms/__init__.py:19:28: W292 No newline at end of file help: Add trailing newline
93 changes: 93 additions & 0 deletions src/sap_cloud_sdk/dms/_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import logging
import time
import requests
from requests.exceptions import RequestException
from typing import Optional, TypedDict
from sap_cloud_sdk.dms.exceptions import DMSError, DMSConnectionError, DMSPermissionDeniedException
from sap_cloud_sdk.dms.model import DMSCredentials

logger = logging.getLogger(__name__)


class _TokenResponse(TypedDict):
access_token: str
expires_in: int


class _CachedToken:
def __init__(self, token: str, expires_at: float) -> None:
self.token = token
self.expires_at = expires_at

def is_valid(self) -> bool:
return time.monotonic() < self.expires_at - 30


# TODO: limit number of access tokens in cache to 10
class Auth:
"""Fetches and caches OAuth2 access tokens for DMS service requests."""

def __init__(self, credentials: DMSCredentials) -> None:
self._credentials = credentials
self._cache: dict[str, _CachedToken] = {}

def get_token(self, tenant_subdomain: Optional[str] = None) -> str:
cache_key = tenant_subdomain or "technical"

cached = self._cache.get(cache_key)
if cached and cached.is_valid():
logger.debug("Using cached token for key '%s'", cache_key)
return cached.token

logger.debug("Fetching new token for key '%s'", cache_key)
token_url = self._resolve_token_url(tenant_subdomain)
token = self._fetch_token(token_url)

self._cache[cache_key] = _CachedToken(
token=token["access_token"],
expires_at=time.monotonic() + token.get("expires_in", 3600),
)
logger.debug("Token cached for key '%s'", cache_key)
return self._cache[cache_key].token

def _resolve_token_url(self, tenant_subdomain: Optional[str]) -> str:
if not tenant_subdomain:
return self._credentials.token_url
logger.debug("Resolving token URL for tenant '%s'", tenant_subdomain)
return self._credentials.token_url.replace(
self._credentials.identityzone,
tenant_subdomain,
)

def _fetch_token(self, token_url: str) -> _TokenResponse:
try:
response = requests.post(
f"{token_url}/oauth/token",
data={
"grant_type": "client_credentials",
"client_id": self._credentials.client_id,
"client_secret": self._credentials.client_secret,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=10,
)
response.raise_for_status()
except requests.exceptions.ConnectionError as e:
logger.error("Failed to connect to token endpoint")
raise DMSConnectionError("Failed to connect to the authentication server") from e
except requests.exceptions.HTTPError as e:
status = e.response.status_code if e.response is not None else None
logger.error("Token request failed with status %s", status)
if status in (401, 403):
raise DMSPermissionDeniedException("Authentication failed — invalid client credentials", status) from e
raise DMSError("Failed to obtain access token", status) from e
except RequestException as e:
logger.error("Unexpected error during token fetch")
raise DMSConnectionError("Unexpected error during authentication") from e

payload: _TokenResponse = response.json()
if not payload.get("access_token"):
raise DMSError("Token response missing access_token")

logger.debug("Token fetched successfully")
return payload

Check failure on line 93 in src/sap_cloud_sdk/dms/_auth.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (W292)

src/sap_cloud_sdk/dms/_auth.py:93:23: W292 No newline at end of file help: Add trailing newline
2 changes: 2 additions & 0 deletions src/sap_cloud_sdk/dms/_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
REPOSITORIES = "/rest/v2/repositories"
CONFIGS = "/rest/v2/configs"

Check failure on line 2 in src/sap_cloud_sdk/dms/_endpoints.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (W292)

src/sap_cloud_sdk/dms/_endpoints.py:2:41: W292 No newline at end of file help: Add trailing newline
175 changes: 175 additions & 0 deletions src/sap_cloud_sdk/dms/_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import logging
from typing import Any, Optional
from requests import Response
import requests
from requests.exceptions import RequestException
from sap_cloud_sdk.dms._auth import Auth
from sap_cloud_sdk.dms.exceptions import (
DMSError,
DMSConnectionError,
DMSInvalidArgumentException,
DMSObjectNotFoundException,
DMSPermissionDeniedException,
DMSRuntimeException,
)
from sap_cloud_sdk.dms.model import UserClaim

logger = logging.getLogger(__name__)


class HttpInvoker:
"""Low-level HTTP layer. Injects auth headers and enforces timeouts."""

def __init__(
self,
auth: Auth,
base_url: str,
connect_timeout: int | None = None,
read_timeout: int | None = None,
) -> None:
self._auth = auth
self._base_url = base_url.rstrip("/")
self._connect_timeout = connect_timeout or 10
self._read_timeout = read_timeout or 30

def get(
self,
path: str,
tenant_subdomain: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
user_claim: Optional[UserClaim] = None,
) -> Response:
logger.debug("GET %s", path)
return self._handle(self._execute(
lambda: requests.get(
f"{self._base_url}{path}",
headers=self._merged_headers(tenant_subdomain, headers, user_claim),
timeout=(self._connect_timeout, self._read_timeout),
)
))

def post(
self,
path: str,
payload: dict[str, Any],
tenant_subdomain: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
user_claim: Optional[UserClaim] = None,
) -> Response:
logger.debug("POST %s", path)
return self._handle(self._execute(
lambda: requests.post(
f"{self._base_url}{path}",
headers=self._merged_headers(tenant_subdomain, headers, user_claim),
json=payload,
timeout=(self._connect_timeout, self._read_timeout),
)
))

def put(
self,
path: str,
payload: dict[str, Any],
tenant_subdomain: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
user_claim: Optional[UserClaim] = None,
) -> Response:
logger.debug("PUT %s", path)
return self._handle(self._execute(
lambda: requests.put(
f"{self._base_url}{path}",
headers=self._merged_headers(tenant_subdomain, headers, user_claim),
json=payload,
timeout=(self._connect_timeout, self._read_timeout),
)
))

def delete(
self,
path: str,
tenant_subdomain: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
user_claim: Optional[UserClaim] = None,
) -> Response:
logger.debug("DELETE %s", path)
return self._handle(self._execute(
lambda: requests.delete(
f"{self._base_url}{path}",
headers=self._merged_headers(tenant_subdomain, headers, user_claim),
timeout=(self._connect_timeout, self._read_timeout),
)
))

def _execute(self, fn: Any) -> Response:
"""Execute an HTTP call, wrapping network errors into DMSConnectionError."""
try:
return fn()
except requests.exceptions.ConnectionError as e:
logger.error("Connection error during HTTP request")
raise DMSConnectionError("Failed to connect to the DMS service") from e
except requests.exceptions.Timeout as e:
logger.error("Request timed out")
raise DMSConnectionError("Request to DMS service timed out") from e
except RequestException as e:
logger.error("Unexpected network error")
raise DMSConnectionError("Unexpected network error") from e

def _default_headers(self, tenant_subdomain: Optional[str] = None) -> dict[str, str]:
return {
"Authorization": f"Bearer {self._auth.get_token(tenant_subdomain)}",
"Content-Type": "application/json",
"Accept": "application/json",
}

def _user_claim_headers(self, user_claim: Optional[UserClaim]) -> dict[str, str]:
if not user_claim:
return {}
headers: dict[str, str] = {}
if user_claim.x_ecm_user_enc:
headers["X-EcmUserEnc"] = user_claim.x_ecm_user_enc
if user_claim.x_ecm_add_principals:
headers["X-EcmAddPrincipals"] = ";".join(user_claim.x_ecm_add_principals)
return headers

def _merged_headers(
self,
tenant_subdomain: Optional[str],
overrides: Optional[dict[str, str]],
user_claim: Optional[UserClaim] = None,
) -> dict[str, str]:
return {
**self._default_headers(tenant_subdomain),
**self._user_claim_headers(user_claim),
**(overrides or {}),
}

def _handle(self, response: Response) -> Response:
logger.debug("Response status: %s", response.status_code)
if response.status_code in (200, 201, 204):
return response

# error_content kept for debugging but not surfaced in the exception message
error_content = response.text
logger.warning("Request failed with status %s", response.status_code)

match response.status_code:
case 400:
raise DMSInvalidArgumentException(
"Request contains invalid or disallowed parameters", 400, error_content
)
case 401 | 403:
raise DMSPermissionDeniedException(
"Access denied — invalid or expired token", response.status_code, error_content
)
case 404:
raise DMSObjectNotFoundException(
"The requested resource was not found", 404, error_content
)
case 500:
raise DMSRuntimeException(
"The DMS service encountered an internal error", 500, error_content
)
case _:
raise DMSError(
f"Unexpected response from DMS service : "+error_content, response.status_code, error_content

Check failure on line 174 in src/sap_cloud_sdk/dms/_http.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (F541)

src/sap_cloud_sdk/dms/_http.py:174:21: F541 f-string without any placeholders help: Remove extraneous `f` prefix
)

Check failure on line 175 in src/sap_cloud_sdk/dms/_http.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (W292)

src/sap_cloud_sdk/dms/_http.py:175:18: W292 No newline at end of file help: Add trailing newline
Loading
Loading