Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions spp_approval/models/approval_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,38 @@ def action_approve(self, comment=None):
record._check_can_approve()
record._do_approve(comment=comment)

def _action_approve_system(self, comment=None):
"""System-initiated approval bypassing user permission checks.

Use this for automated approvals triggered by system events (e.g., DCI
verification match, scheduled jobs) where there is no human approver.

This method uses sudo() to bypass access controls and skips the
_check_can_approve() permission validation.

The underscore prefix is intentional — it prevents this method from being
callable via Odoo's RPC interface, since it must not be exposed to users.

Args:
comment: Optional approval comment for audit trail
"""
for record in self:
if record.approval_state != "pending":
_logger.warning(
"Skipping system approval for %s %s: state is %s, not pending",
record._name,
record.id,
record.approval_state,
)
continue
record.sudo()._do_approve(comment=comment, auto=True) # nosemgrep: odoo-sudo-without-context
_logger.info(
"System auto-approved %s %s: %s",
record._name,
record.id,
comment or "(no comment)",
)

def _do_approve(self, comment=None, auto=False):
"""Internal method to perform approval."""
self.ensure_one()
Expand Down
10 changes: 6 additions & 4 deletions spp_dci_client/models/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,12 @@ def get_oauth2_token(self, force_refresh=False):
_logger.info("Requesting new OAuth2 token for data source: %s", self.code)

try:
# Use sudo() to access OAuth2 credentials which are restricted to administrators
sudo_self = self.sudo() # nosemgrep: odoo-sudo-without-context
token_data = {
"grant_type": "client_credentials",
"client_id": self.oauth2_client_id,
"client_secret": self.oauth2_client_secret,
"client_id": sudo_self.oauth2_client_id,
"client_secret": sudo_self.oauth2_client_secret,
}

if self.oauth2_scope:
Expand Down Expand Up @@ -358,9 +360,9 @@ def get_oauth2_token(self, force_refresh=False):
if not access_token:
raise UserError(_("OAuth2 token response did not contain access_token"))

# Cache token with expiry
# Cache token with expiry (use sudo to write to restricted model)
expires_at = now + timedelta(seconds=expires_in)
self.write(
sudo_self.write(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing sudo in token cache clearing breaks non-admin 401 retry

Medium Severity

The get_oauth2_token method now correctly uses sudo_self.write() to cache the token for non-admin users, but clear_oauth2_token_cache() still calls self.write() without sudo(). Per ir.model.access.csv, non-admin users have read-only access to spp.dci.data.source (perm_write=0). When a non-admin user hits a 401 response in _make_request, the call to self.data_source.clear_oauth2_token_cache() raises an AccessError, preventing the token-refresh retry from ever executing. Before this PR, non-admin users couldn't reach this code path at all (they'd fail at the credential read), so the sudo changes make this a newly reachable failure.

Fix in Cursor Fix in Web

{
"_oauth2_access_token": access_token,
"_oauth2_token_expires_at": expires_at,
Expand Down
125 changes: 124 additions & 1 deletion spp_dci_client/services/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""DCI Client Service for making signed API requests."""

import json
import logging
import time
import uuid
from datetime import UTC, datetime
from typing import Any
Expand Down Expand Up @@ -904,6 +906,13 @@ def _make_request(self, endpoint: str, envelope: dict, _retry_auth: bool = True)
# Get headers from data source (includes auth)
headers = self.data_source.get_headers()

# Track timing and result for outgoing log
start_time = time.monotonic()
log_status = "success"
log_status_code = None
log_response_data = None
log_error_detail = None

try:
_logger.info(
"Making DCI request to %s (action: %s)",
Expand All @@ -930,6 +939,13 @@ def _make_request(self, endpoint: str, envelope: dict, _retry_auth: bool = True)
# Handle 401 Unauthorized - try refreshing token once
if response.status_code == 401 and _retry_auth and self.data_source.auth_type == "oauth2":
_logger.warning("Got 401 Unauthorized, clearing OAuth2 token cache and retrying with fresh token")
log_status = "http_error"
log_status_code = 401
log_error_detail = "401 Unauthorized - retrying with fresh token"
try:
log_response_data = response.json()
except json.JSONDecodeError:
log_response_data = None
self.data_source.clear_oauth2_token_cache()
return self._make_request(endpoint, envelope, _retry_auth=False)

Expand All @@ -938,6 +954,8 @@ def _make_request(self, endpoint: str, envelope: dict, _retry_auth: bool = True)

# Parse response
response_data = response.json()
log_status_code = response.status_code
log_response_data = response_data

_logger.info(
"DCI request successful (status: %s, message_id: %s)",
Expand All @@ -949,18 +967,23 @@ def _make_request(self, endpoint: str, envelope: dict, _retry_auth: bool = True)
return response_data

except httpx.HTTPStatusError as e:
log_status = "http_error"
log_status_code = e.response.status_code

# Log technical details for troubleshooting
technical_detail = f"DCI request failed with status {e.response.status_code}"
response_text = e.response.text
try:
error_data = e.response.json()
log_response_data = error_data
if "header" in error_data and "status_reason_message" in error_data["header"]:
technical_detail += f": {error_data['header']['status_reason_message']}"
else:
technical_detail += f": {response_text}"
except Exception:
except (json.JSONDecodeError, KeyError, TypeError):
technical_detail += f": {response_text}"

log_error_detail = technical_detail
_logger.error(technical_detail)
_logger.error("Full response body: %s", response_text)
_logger.error("Request envelope was: %s", envelope)
Expand All @@ -978,26 +1001,126 @@ def _make_request(self, endpoint: str, envelope: dict, _retry_auth: bool = True)
error_str = str(e).lower()
if "timeout" in error_str or "timed out" in error_str:
connection_type = "timeout"
log_status = "timeout"
elif "ssl" in error_str or "certificate" in error_str:
connection_type = "ssl"
log_status = "connection_error"
elif "name or service not known" in error_str or "nodename nor servname" in error_str:
connection_type = "dns"
log_status = "connection_error"
else:
connection_type = "connection"
log_status = "connection_error"

log_error_detail = technical_detail

# Show user-friendly message
user_msg = format_connection_error(connection_type, technical_detail)
raise UserError(user_msg) from e

except Exception as e:
log_status = "error"

# Log technical details for troubleshooting
technical_detail = f"Unexpected error during DCI request: {str(e)}"
log_error_detail = technical_detail
_logger.error(technical_detail, exc_info=True)

# Show generic user-friendly message
user_msg = _("An unexpected error occurred. Please contact your administrator.")
raise UserError(user_msg) from e

finally:
duration_ms = int((time.monotonic() - start_time) * 1000)
self._log_outgoing_call(
url=url,
endpoint=endpoint,
envelope=envelope,
response_data=log_response_data,
status_code=log_status_code,
duration_ms=duration_ms,
status=log_status,
error_detail=log_error_detail,
)

# =========================================================================
# OUTGOING LOG
# =========================================================================

def _log_outgoing_call(
self,
url: str,
endpoint: str,
envelope: dict,
response_data: dict | None,
status_code: int | None,
duration_ms: int,
status: str,
error_detail: str | None,
):
"""Log an outgoing API call to spp.api.outgoing.log (soft dependency).

Uses runtime check to avoid hard manifest dependency on spp_api_v2.
Uses a separate database cursor so log entries persist even when the
caller's transaction is rolled back (e.g., on UserError).
Logging failures are swallowed so they never block the actual request.
"""
try:
if "spp.api.outgoing.log" not in self.env:
return

from odoo.addons.spp_api_v2.services.outgoing_api_log_service import OutgoingApiLogService

# Capture values from the current env before opening a new cursor,
# since self.data_source won't be accessible from the new cursor's env.
service_code = getattr(self.data_source, "code", None) or "dci"
origin_record_id = self.data_source.id if hasattr(self.data_source, "id") else None
user_id = self.env.uid
sanitized_envelope = self._copy_envelope_for_log(envelope)

# Use a separate cursor so log entries survive transaction rollback.
with self.env.registry.cursor() as new_cr:
new_env = self.env(cr=new_cr)
service = OutgoingApiLogService(
new_env,
service_name="DCI Client",
service_code=service_code,
user_id=user_id,
)

service.log_call(
url=url,
endpoint=endpoint,
http_method="POST",
request_summary=sanitized_envelope,
response_summary=response_data,
response_status_code=status_code,
duration_ms=duration_ms,
origin_model="spp.dci.data.source",
origin_record_id=origin_record_id,
status=status,
error_detail=error_detail,
)
except Exception:
_logger.warning("Failed to log outgoing API call (non-blocking)", exc_info=True)

def _copy_envelope_for_log(self, envelope: dict) -> dict | None:
"""Copy the request envelope for audit log storage.

Returns a shallow copy suitable for JSON storage. The cryptographic
signature is preserved for auditability (non-repudiation).

Args:
envelope: Request envelope dict

Returns:
Copy of the envelope, or None if input is falsy
"""
if not envelope:
return None

return dict(envelope)

# =========================================================================
# HELPER METHODS
# =========================================================================
Expand Down
1 change: 1 addition & 0 deletions spp_dci_client/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

from . import test_data_source
from . import test_client_service
from . import test_outgoing_log_integration
Loading