diff --git a/backend/api_v2/notification.py b/backend/api_v2/notification.py index 733084c637..d5d2dca9df 100644 --- a/backend/api_v2/notification.py +++ b/backend/api_v2/notification.py @@ -1,6 +1,7 @@ import logging -from notification_v2.helper import NotificationHelper +from notification_v2.enums import FAILURE_STATUSES +from notification_v2.helper import dispatch_with_delivery_mode from notification_v2.models import Notification from pipeline_v2.dto import PipelineStatusPayload from workflow_manager.workflow_v2.models.execution import WorkflowExecution @@ -16,11 +17,33 @@ def __init__(self, api: APIDeployment, workflow_execution: WorkflowExecution) -> self.api = api self.workflow_execution = workflow_execution - def send(self): - if not self.notifications.count(): - logger.info(f"No notifications found for api {self.api}") + def send(self) -> None: + # Failure if the run hit a non-success terminal state OR any file errored. + # Partial-success runs land as status=COMPLETED with failed_files>0, so the + # status check alone misses them — see callback aggregation rules. + failed_files = self.workflow_execution.failed_files or 0 + is_failure = ( + self.workflow_execution.status in FAILURE_STATUSES or failed_files > 0 + ) + if not is_failure: + # Success path: skip rows that opted into failure-only alerts. + self.notifications = self.notifications.filter(notify_on_failures=False) + + if not self.notifications.exists(): + logger.info( + "No notifications to dispatch for api %s (status=%s, failed_files=%s)", + self.api, + self.workflow_execution.status, + failed_files, + ) return - logger.info(f"Sending api status notification for api {self.api}") + logger.info( + "Sending api status notification for api %s (status=%s, successful=%s, failed=%s)", + self.api, + self.workflow_execution.status, + self.workflow_execution.successful_files or 0, + failed_files, + ) payload_dto = PipelineStatusPayload( type="API", @@ -29,8 +52,13 @@ def send(self): status=self.workflow_execution.status, execution_id=self.workflow_execution.id, error_message=self.workflow_execution.error_message, + total_files=self.workflow_execution.total_files, + successful_files=self.workflow_execution.successful_files, + failed_files=failed_files, ) - NotificationHelper.send_notification( - notifications=self.notifications, payload=payload_dto.to_dict() + dispatch_with_delivery_mode( + list(self.notifications), + payload_dto.to_dict(), + error_context=f"api={self.api.id}", ) diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index a77b44adaf..73969ec13d 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -219,6 +219,15 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str | INDEXING_FLAG_TTL = int(get_required_setting("INDEXING_FLAG_TTL")) NOTIFICATION_TIMEOUT = int(get_required_setting("NOTIFICATION_TIMEOUT", "5")) +# Window for clubbing BATCHED notifications — also the flush cadence (seconds). +# Default 300 (5 min). Per-notification buffer rows precompute flush_after at +# enqueue time, so changing this only affects rows enqueued after the restart. +NOTIFICATION_CLUB_INTERVAL = int(os.environ.get("NOTIFICATION_CLUB_INTERVAL", "300")) +# Retention for terminal NotificationBuffer rows (DISPATCHED / DEAD_LETTER). +# PENDING rows are never GC'd regardless of age. +NOTIFICATION_BUFFER_RETENTION_DAYS = int( + os.environ.get("NOTIFICATION_BUFFER_RETENTION_DAYS", "7") +) ATOMIC_REQUESTS = CommonUtils.str_to_bool( os.environ.get("DJANGO_ATOMIC_REQUESTS", "False") ) diff --git a/backend/configuration/enums.py b/backend/configuration/enums.py index f35033a9f7..78bbeb7dfd 100644 --- a/backend/configuration/enums.py +++ b/backend/configuration/enums.py @@ -61,6 +61,14 @@ class ConfigKey(Enum): max_value=settings.MAX_PARALLEL_FILE_BATCHES_MAX_VALUE, ) + NOTIFICATION_CLUB_INTERVAL = ConfigSpec( + default=settings.NOTIFICATION_CLUB_INTERVAL, + value_type=ConfigType.INT, + help_text="Window (seconds) for clubbing BATCHED notifications.", + min_value=60, + max_value=7200, + ) + def cast_value(self, raw_value: Any): converters = { ConfigType.INT: int, diff --git a/backend/notification_v2/clubbed_renderer.py b/backend/notification_v2/clubbed_renderer.py new file mode 100644 index 0000000000..af20007f76 --- /dev/null +++ b/backend/notification_v2/clubbed_renderer.py @@ -0,0 +1,52 @@ +"""Backend dispatch entry for clubbed-notification rendering. + +Delegates the canonical envelope + Slack body to +``unstract.core.notification_clubbed_renderer`` so backend and worker +callbacks emit byte-identical receiver-visible payloads. This thin shim +keeps the ``render_clubbed_message`` platform dispatcher (uses +``PlatformType`` enum) backend-side; everything else lives in the shared +module. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from notification_v2.enums import PlatformType +from unstract.core.notification_clubbed_renderer import ( + build_envelope, + render_slack_text, +) + +logger = logging.getLogger(__name__) + +__all__ = ["build_envelope", "render_clubbed_message"] + + +def _render_for_slack(envelope: dict[str, Any]) -> dict[str, Any]: + """Wrap the rendered Slack mrkdwn body in the dict shape Slack expects.""" + return {"text": render_slack_text(envelope)} + + +def render_clubbed_message( + payloads: list[dict[str, Any]], + platform: str, +) -> dict[str, Any]: + """Top-level entry — returns the dispatch body for ``platform``. + + Used by every dispatch site so the receiver-visible payload is + identical regardless of caller. + """ + envelope = build_envelope(payloads) + if platform == PlatformType.SLACK.value: + return _render_for_slack(envelope) + if platform == PlatformType.API.value: + return envelope + # Unknown platform — fall back to the raw envelope and warn so misrouted + # rows don't drop silently. + logger.warning( + "Unknown platform %s for clubbed dispatch; returning raw envelope", + platform, + ) + return envelope diff --git a/backend/notification_v2/enums.py b/backend/notification_v2/enums.py index 991b08cac9..8ebced2a2e 100644 --- a/backend/notification_v2/enums.py +++ b/backend/notification_v2/enums.py @@ -1,5 +1,12 @@ from enum import Enum +from workflow_manager.workflow_v2.enums import ExecutionStatus + +# Single source of truth for "did this run fail for notification routing?". +# STOPPED is intentionally a failure here per migrations/0002_…notify_on_failures +# db_comment ("terminal status ERROR/STOPPED or any file in the run errored"). +FAILURE_STATUSES = frozenset({ExecutionStatus.ERROR.value, ExecutionStatus.STOPPED.value}) + class NotificationType(Enum): WEBHOOK = "WEBHOOK" @@ -36,3 +43,41 @@ class PlatformType(Enum): @classmethod def choices(cls): return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls] + + +class DeliveryMode(Enum): + """Per-notification dispatch mode. + + Product ships every notification as BATCHED — events buffer into + ``NotificationBuffer`` and flush as one clubbed message per + (org, webhook_url, auth_sig) every ``NOTIFICATION_CLUB_INTERVAL`` seconds. + + The ``IMMEDIATE`` value is purely a historical DB value — no code reads + it anymore (the legacy synchronous-dispatch path was removed). The + column and enum value remain so existing rows don't break; both will be + dropped in a follow-up schema migration. + """ + + IMMEDIATE = "IMMEDIATE" + BATCHED = "BATCHED" + + @classmethod + def choices(cls): + return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls] + + +class BufferStatus(Enum): + """Lifecycle states for a NotificationBuffer row. + + PENDING — waiting for the next flush tick. + DISPATCHED — successfully sent as part of a clubbed message. + DEAD_LETTER — Celery exhausted retries; terminal, never re-picked. + """ + + PENDING = "PENDING" + DISPATCHED = "DISPATCHED" + DEAD_LETTER = "DEAD_LETTER" + + @classmethod + def choices(cls): + return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls] diff --git a/backend/notification_v2/helper.py b/backend/notification_v2/helper.py index a454b9d82b..9e99335ce8 100644 --- a/backend/notification_v2/helper.py +++ b/backend/notification_v2/helper.py @@ -1,48 +1,185 @@ +import hashlib +import json import logging +from collections.abc import Iterable +from datetime import timedelta from typing import Any -from notification_v2.enums import NotificationType, PlatformType -from notification_v2.models import Notification -from notification_v2.provider.notification_provider import NotificationProvider -from notification_v2.provider.registry import get_notification_provider +from account_v2.models import Organization +from django.utils import timezone + +from notification_v2.enums import ( + AuthorizationType, + BufferStatus, + PlatformType, +) +from notification_v2.models import Notification, NotificationBuffer logger = logging.getLogger(__name__) +# Used as a stable salt-free input for SHA-256 grouping; collisions are +# vanishingly improbable and the digest is never used as a security primitive. +_AUTH_SIG_NONE = "" + + +def compute_auth_sig(notification: Notification) -> str: + """SHA-256 hex of (auth_type, auth_key, auth_header) — never raw creds. + + Identical auth configs produce the same sig (so grouping clubs them); + differing configs split into separate groups. The tuple is JSON-encoded + before hashing so a literal delimiter byte inside auth_key/header cannot + cause two distinct tuples to collapse to the same digest. + """ + raw = json.dumps( + [ + notification.authorization_type or _AUTH_SIG_NONE, + notification.authorization_key or _AUTH_SIG_NONE, + notification.authorization_header or _AUTH_SIG_NONE, + ], + separators=(",", ":"), + ) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def webhook_url_hash(url: str | None) -> str: + """Short, log-safe fingerprint of a webhook URL (first 8 chars of SHA-256).""" + if not url: + return "none" + return hashlib.sha256(url.encode("utf-8")).hexdigest()[:8] + + +def get_org_club_interval_seconds(organization: Organization) -> int: + """Per-org override of NOTIFICATION_CLUB_INTERVAL, falling back to env default. + + Reads from the generic configuration KV table; returns the env-derived + default when the org has no override. The value is read at enqueue time + and baked into the row's flush_after — see mfbt §EC-2 / §EC-8: changing + the override only affects rows enqueued after the change. + """ + # Local import: configuration depends on Django settings at import time + # and notification_v2.helper is imported during app boot. + from configuration.enums import ConfigKey + from configuration.models import Configuration + + return int( + Configuration.get_value_by_organization( + ConfigKey.NOTIFICATION_CLUB_INTERVAL, organization + ) + ) + + +def build_webhook_headers(notification: Notification) -> dict[str, str]: + """Build HTTP headers for a webhook dispatch from the notification's auth. + + Used by the buffer flush in ``internal_api_views._dispatch_group`` to + pass live auth headers through to the Celery task. + """ + headers = {"Content-Type": "application/json"} + auth_type_raw = (notification.authorization_type or "").upper() + auth_key = notification.authorization_key + auth_header = notification.authorization_header + if auth_type_raw == AuthorizationType.BEARER.value and auth_key: + headers["Authorization"] = f"Bearer {auth_key}" + elif auth_type_raw == AuthorizationType.API_KEY.value and auth_key: + headers["Authorization"] = auth_key + elif ( + auth_type_raw == AuthorizationType.CUSTOM_HEADER.value + and auth_header + and auth_key + ): + headers[auth_header] = auth_key + return headers + + +def _resolve_organization(notification: Notification) -> Organization | None: + """Walk pipeline/api FK to find the owning org. Notification has no direct FK.""" + pipeline = notification.pipeline + if pipeline and pipeline.organization_id: + return pipeline.organization + api = notification.api + if api and api.organization_id: + return api.organization + return None + + +def dispatch_with_delivery_mode( + notifications: "Iterable[Notification]", + payload: dict[str, Any], + *, + error_context: str = "", +) -> None: + """Enqueue every active notification into ``NotificationBuffer``. + + Single dispatch path: each notification produces a buffer row that the + periodic flush ships as part of a clubbed message. An enqueue failure + on one row is logged but does not abort the loop — sibling notifications + still get their chance. + + ``error_context`` lets callers tag failures with their dispatch source + (pipeline id, api id) for easier triage. + """ + for notification in notifications: + try: + enqueue(notification, payload) + except Exception: + logger.exception( + "Failed to enqueue notification %s%s", + notification.id, + f" ({error_context})" if error_context else "", + ) + + +def enqueue(notification: Notification, payload: dict[str, Any]) -> NotificationBuffer: + """Buffer a single execution event for a notification. + + Computes auth_sig and flush_after at write time so existing PENDING rows + keep their original cadence even if NOTIFICATION_CLUB_INTERVAL or the + notification's auth changes mid-window. Returns the persisted row. + + Raises ValueError if the notification has no resolvable organization + (defensive — the FK chain via pipeline/api always provides one in practice). + """ + organization = _resolve_organization(notification) + if organization is None: + raise ValueError( + f"Notification {notification.id} has no resolvable organization " + "(neither pipeline nor api FK populated)" + ) + + interval_seconds = get_org_club_interval_seconds(organization) + flush_after = timezone.now() + timedelta(seconds=interval_seconds) + auth_sig = compute_auth_sig(notification) + platform = notification.platform or PlatformType.API.value + + # Stamp a buffered-at timestamp so renderers always have one to humanize. + # Worker callers already supply one; backend dispatchers + # (PipelineStatusPayload.to_dict) don't, so default here. + payload = { + **payload, + "timestamp": payload.get("timestamp") or timezone.now().isoformat(), + } + + buffer_row = NotificationBuffer.objects.create( + notification=notification, + organization=organization, + webhook_url=notification.url, + payload=payload, + platform=platform, + auth_sig=auth_sig, + flush_after=flush_after, + status=BufferStatus.PENDING.value, + ) -class NotificationHelper: - @classmethod - def send_notification(cls, notifications: list[Notification], payload: Any) -> None: - """Send notification Sends notifications using the appropriate provider - based on the notification type and platform. - - This method iterates through a list of `Notification` objects, determines the - appropriate notification provider based on the notification's type and - platform, and sends the notification with the provided payload. If an error - occurs due to an invalid notification type or platform, it logs the error. - - Args: - notifications (list[Notification]): A list of `Notification` instances to - be processed and sent. - payload (Any): The data to be sent with the notification. This can be any - format expected by the provider - - Returns: - None - """ - for notification in notifications: - notification_type = NotificationType(notification.notification_type) - platform_type = PlatformType(notification.platform) - try: - notification_provider = get_notification_provider( - notification_type, platform_type - ) - notifier: NotificationProvider = notification_provider( - notification=notification, payload=payload - ) - notifier.send() - logger.info(f"Sending notification to {notification}") - except ValueError as e: - logger.error( - f"Error in notification type {notification_type} and platform " - f"{platform_type} for notification {notification}: {e}" - ) + # Structured log: org + URL fingerprint only — never the raw URL or any + # part of the auth tuple. Downstream metrics consumers grep on metric=. + logger.info( + "metric=notification_buffer_enqueued_total platform=%s org_id=%s " + "webhook_url_hash=%s notification_id=%s buffer_id=%s flush_after=%s", + platform, + organization.organization_id, + webhook_url_hash(notification.url), + notification.id, + buffer_row.id, + flush_after.isoformat(), + ) + return buffer_row diff --git a/backend/notification_v2/internal_api_views.py b/backend/notification_v2/internal_api_views.py index 6843d5a50c..d823da302f 100644 --- a/backend/notification_v2/internal_api_views.py +++ b/backend/notification_v2/internal_api_views.py @@ -9,17 +9,33 @@ - These endpoints are not accessible from browsers and don't use session cookies """ +import json import logging +from datetime import timedelta +from typing import Any, cast from api_v2.models import APIDeployment -from django.http import JsonResponse +from django.conf import settings +from django.db import transaction +from django.db.models import Min, QuerySet +from django.http import HttpRequest, JsonResponse from django.shortcuts import get_object_or_404 +from django.utils import timezone from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from pipeline_v2.models import Pipeline from utils.organization_utils import filter_queryset_by_organization - -from notification_v2.models import Notification +from workflow_manager.workflow_v2.models.execution import WorkflowExecution + +from backend.celery_service import app as celery_app +from notification_v2.clubbed_renderer import render_clubbed_message +from notification_v2.enums import FAILURE_STATUSES, BufferStatus +from notification_v2.helper import ( + build_webhook_headers, + enqueue, + webhook_url_hash, +) +from notification_v2.models import Notification, NotificationBuffer logger = logging.getLogger(__name__) @@ -27,9 +43,68 @@ INTERNAL_SERVER_ERROR_MSG = "Internal server error" +def _load_execution(execution_id: str | None) -> WorkflowExecution | None: + """Best-effort lookup; returns None on missing id or unknown row.""" + if not execution_id: + return None + try: + return cast(WorkflowExecution, WorkflowExecution.objects.get(id=execution_id)) + except WorkflowExecution.DoesNotExist: + logger.warning("WorkflowExecution %s not found", execution_id) + return None + + +def _apply_failure_filter( + notifications_qs: QuerySet[Notification], + execution: WorkflowExecution | None, +) -> QuerySet[Notification]: + """Drop notify_on_failures=True rows on success runs. + + Mirrors the dispatch-side rule in backend/api_v2/notification.py and + backend/pipeline_v2/notification.py so both code paths agree on what + counts as a failure (status ∈ {ERROR, STOPPED} OR any file errored). + + No execution → no filter, preserving legacy "return every active row" + behavior for callers that don't pass execution_id. + """ + if execution is None: + return notifications_qs + failed_files = execution.failed_files or 0 + is_failure = execution.status in FAILURE_STATUSES or failed_files > 0 + if not is_failure: + notifications_qs = notifications_qs.filter(notify_on_failures=False) + return notifications_qs + + +def _execution_counts(execution: WorkflowExecution | None) -> dict[str, int]: + """File counts surfaced into webhook payloads. Empty dict on no execution.""" + if execution is None: + return {} + return { + "total_files": execution.total_files or 0, + "successful_files": execution.successful_files or 0, + "failed_files": execution.failed_files or 0, + } + + +def _serialize_notification(n: Notification) -> dict[str, Any]: + return { + "id": str(n.id), + "notification_type": n.notification_type, + "platform": n.platform, + "url": n.url, + "authorization_type": n.authorization_type, + "authorization_key": n.authorization_key, + "authorization_header": n.authorization_header, + "max_retries": n.max_retries, + "is_active": n.is_active, + "notify_on_failures": n.notify_on_failures, + } + + @csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only @require_http_methods(["GET"]) -def get_pipeline_notifications(request, pipeline_id): +def get_pipeline_notifications(request: HttpRequest, pipeline_id: str) -> JsonResponse: """Get active notifications for a pipeline or API deployment. Used by callback worker to fetch notification configuration. @@ -41,83 +116,53 @@ def get_pipeline_notifications(request, pipeline_id): pipeline_queryset, request, "organization" ) + execution = _load_execution(request.GET.get("execution_id")) + counts = _execution_counts(execution) + if pipeline_queryset.exists(): pipeline = pipeline_queryset.first() - - # Get active notifications for this pipeline notifications = Notification.objects.filter(pipeline=pipeline, is_active=True) - - notifications_data = [] - for notification in notifications: - notifications_data.append( - { - "id": str(notification.id), - "notification_type": notification.notification_type, - "platform": notification.platform, - "url": notification.url, - "authorization_type": notification.authorization_type, - "authorization_key": notification.authorization_key, - "authorization_header": notification.authorization_header, - "max_retries": notification.max_retries, - "is_active": notification.is_active, - } - ) - + notifications = _apply_failure_filter(notifications, execution) + serialized = [_serialize_notification(n) for n in notifications] return JsonResponse( { "status": "success", "pipeline_id": str(pipeline.id), "pipeline_name": pipeline.pipeline_name, "pipeline_type": pipeline.pipeline_type, - "notifications": notifications_data, + "notifications": serialized, + "execution_counts": counts, } ) - else: - # If not found in Pipeline, try APIDeployment model - api_queryset = APIDeployment.objects.filter(id=pipeline_id) - api_queryset = filter_queryset_by_organization( - api_queryset, request, "organization" + + # If not found in Pipeline, try APIDeployment model + api_queryset = APIDeployment.objects.filter(id=pipeline_id) + api_queryset = filter_queryset_by_organization( + api_queryset, request, "organization" + ) + if api_queryset.exists(): + api = api_queryset.first() + notifications = Notification.objects.filter(api=api, is_active=True) + notifications = _apply_failure_filter(notifications, execution) + serialized = [_serialize_notification(n) for n in notifications] + return JsonResponse( + { + "status": "success", + "pipeline_id": str(api.id), + "pipeline_name": api.api_name, + "pipeline_type": "API", + "notifications": serialized, + "execution_counts": counts, + } ) - if api_queryset.exists(): - api = api_queryset.first() - - # Get active notifications for this API deployment - notifications = Notification.objects.filter(api=api, is_active=True) - - notifications_data = [] - for notification in notifications: - notifications_data.append( - { - "id": str(notification.id), - "notification_type": notification.notification_type, - "platform": notification.platform, - "url": notification.url, - "authorization_type": notification.authorization_type, - "authorization_key": notification.authorization_key, - "authorization_header": notification.authorization_header, - "max_retries": notification.max_retries, - "is_active": notification.is_active, - } - ) - - return JsonResponse( - { - "status": "success", - "pipeline_id": str(api.id), - "pipeline_name": api.api_name, - "pipeline_type": "API", - "notifications": notifications_data, - } - ) - else: - return JsonResponse( - { - "status": "error", - "message": "Pipeline or API deployment not found", - }, - status=404, - ) + return JsonResponse( + { + "status": "error", + "message": "Pipeline or API deployment not found", + }, + status=404, + ) except Exception as e: logger.error(f"Error getting pipeline notifications for {pipeline_id}: {e}") return JsonResponse( @@ -127,7 +172,7 @@ def get_pipeline_notifications(request, pipeline_id): @csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only @require_http_methods(["GET"]) -def get_api_notifications(request, api_id): +def get_api_notifications(request: HttpRequest, api_id: str) -> JsonResponse: """Get active notifications for an API deployment. Used by callback worker to fetch notification configuration. @@ -140,24 +185,9 @@ def get_api_notifications(request, api_id): ) api = get_object_or_404(api_queryset) - # Get active notifications for this API + execution = _load_execution(request.GET.get("execution_id")) notifications = Notification.objects.filter(api=api, is_active=True) - - notifications_data = [] - for notification in notifications: - notifications_data.append( - { - "id": str(notification.id), - "notification_type": notification.notification_type, - "platform": notification.platform, - "url": notification.url, - "authorization_type": notification.authorization_type, - "authorization_key": notification.authorization_key, - "authorization_header": notification.authorization_header, - "max_retries": notification.max_retries, - "is_active": notification.is_active, - } - ) + notifications = _apply_failure_filter(notifications, execution) return JsonResponse( { @@ -165,7 +195,8 @@ def get_api_notifications(request, api_id): "api_id": str(api.id), "api_name": api.api_name, "display_name": api.display_name, - "notifications": notifications_data, + "notifications": [_serialize_notification(n) for n in notifications], + "execution_counts": _execution_counts(execution), } ) @@ -182,7 +213,7 @@ def get_api_notifications(request, api_id): @csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only @require_http_methods(["GET"]) -def get_pipeline_data(request, pipeline_id): +def get_pipeline_data(request: HttpRequest, pipeline_id: str) -> JsonResponse: """Get basic pipeline data for notification purposes. Used by callback worker to determine pipeline type and name. @@ -218,7 +249,7 @@ def get_pipeline_data(request, pipeline_id): @csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only @require_http_methods(["GET"]) -def get_api_data(request, api_id): +def get_api_data(request: HttpRequest, api_id: str) -> JsonResponse: """Get basic API deployment data for notification purposes. Used by callback worker to determine API name and details. @@ -250,3 +281,312 @@ def get_api_data(request, api_id): return JsonResponse( {"status": "error", "message": INTERNAL_SERVER_ERROR_MSG}, status=500 ) + + +# `execution_id` is required except for INPROGRESS, which fires from the +# scheduler (workers/scheduler/tasks.py, UN-2850) before WorkflowExecution +# exists. INPROGRESS rows therefore store execution_id=null — receivers +# cannot correlate with execution logs until the producer-reorder lands +# (UN-3056). +_ENQUEUE_REQUIRED_FIELDS = ( + "notification_id", + "pipeline_id", + "pipeline_name", + "status", + "platform", + "execution_id", +) + + +@csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only +@require_http_methods(["POST"]) +def enqueue_notification_buffer(request: HttpRequest) -> JsonResponse: + """Buffer one execution event from a callback worker. + + Worker code is model-free: it forwards a notification_id + structured + payload here and lets the backend write the NotificationBuffer row. + Rejects rows whose source notification is not BATCHED so a worker + routing bug cannot silently divert non-BATCHED traffic into the buffer. + """ + try: + body = json.loads(request.body.decode("utf-8") or "{}") + except json.JSONDecodeError: + return JsonResponse( + {"status": "error", "message": "Invalid JSON body"}, status=400 + ) + + missing_fields = [f for f in _ENQUEUE_REQUIRED_FIELDS if not body.get(f)] + # INPROGRESS is the one status legitimately allowed to omit execution_id + # (see comment on _ENQUEUE_REQUIRED_FIELDS). + if ( + body.get("status") == Pipeline.PipelineStatus.INPROGRESS + and "execution_id" in missing_fields + ): + missing_fields.remove("execution_id") + if missing_fields: + return JsonResponse( + { + "status": "error", + "message": f"Missing required fields: {', '.join(missing_fields)}", + }, + status=400, + ) + + try: + notification = Notification.objects.get(id=body["notification_id"]) + except Notification.DoesNotExist: + return JsonResponse( + {"status": "error", "message": "Notification not found"}, status=404 + ) + + # INPROGRESS fires from the scheduler before a WorkflowExecution exists, + # so the GET-side `_apply_failure_filter` cannot run (no execution → no + # filter applied) and returns notify_on_failures=True rows too. Drop the + # event here so failure-only subscribers never receive a run-start. + if ( + notification.notify_on_failures + and body.get("status") == Pipeline.PipelineStatus.INPROGRESS + ): + return JsonResponse({"status": "ok", "buffer_row_id": None}) + + # type / timestamp / additional_data stay optional during rollout — older + # worker builds that don't forward them still produce a usable row + # (renderer falls back to "Type: —" / no Additional Data line). + payload = { + "type": body.get("type", ""), + "execution_id": body.get("execution_id"), + "pipeline_id": body["pipeline_id"], + "pipeline_name": body["pipeline_name"], + "status": body["status"], + "error_message": body.get("error_message"), + "platform": body["platform"], + "timestamp": body.get("timestamp"), + "additional_data": body.get("additional_data") or {}, + } + try: + buffer_row = enqueue(notification, payload) + except ValueError as e: + return JsonResponse({"status": "error", "message": str(e)}, status=400) + + return JsonResponse( + {"status": "success", "buffer_row_id": str(buffer_row.id)}, status=201 + ) + + +def _gc_terminal_rows() -> int: + """Delete buffer rows past the retention window. + + Two sweeps: + - Terminal rows (DISPATCHED / DEAD_LETTER) older than the retention + window: hygiene for completed work. + - PENDING rows whose source notification has been deactivated and + whose ``flush_after`` has aged past the same window: ``_dispatch_group`` + filters ``notification__is_active=True``, so without this sweep + these rows are unreachable from both dispatch and GC and would + accumulate forever in the partial PENDING index. + + PENDING rows attached to active notifications are intentionally + untouched regardless of age — they represent live work the flush + job still owns. + """ + cutoff = timezone.now() - timedelta(days=settings.NOTIFICATION_BUFFER_RETENTION_DAYS) + terminal_deleted, _ = NotificationBuffer.objects.filter( + status__in=[BufferStatus.DISPATCHED.value, BufferStatus.DEAD_LETTER.value], + created_at__lt=cutoff, + ).delete() + inactive_deleted, _ = NotificationBuffer.objects.filter( + status=BufferStatus.PENDING.value, + notification__is_active=False, + flush_after__lt=cutoff, + ).delete() + return int(terminal_deleted) + int(inactive_deleted) + + +def _send_clubbed( + *, + url: str, + body: Any, + headers: dict[str, str], + platform: str, + max_retries: int, + buffer_ids: list[str], + org_id: Any, +) -> None: + """Send the clubbed Celery task after the DB transition has committed. + + Runs as a ``transaction.on_commit`` callback so a rolled-back UPDATE can + never leave a broker-queued message orphaned (the prior order — send + then update — risked duplicate delivery if the UPDATE failed). On broker + failure we revert rows back to PENDING in a separate transaction so the + next flush tick retries cleanly. + """ + try: + celery_app.send_task( + "send_webhook_notification", + args=[url, body, headers, settings.NOTIFICATION_TIMEOUT], + kwargs={ + "max_retries": max_retries, + "retry_delay": 10, + "platform": platform, + }, + queue="notifications", + link_error=celery_app.signature( + "notification_v2.mark_buffer_dead_letter", + kwargs={"buffer_row_ids": buffer_ids}, + ), + ) + logger.info( + "metric=notification_batch_dispatched_total platform=%s result=success " + "org_id=%s webhook_url_hash=%s rows=%d", + platform, + org_id, + webhook_url_hash(url), + len(buffer_ids), + ) + except Exception: + logger.exception( + "metric=notification_batch_dispatched_total platform=%s " + "result=broker_failure org_id=%s webhook_url_hash=%s rows=%d", + platform, + org_id, + webhook_url_hash(url), + len(buffer_ids), + ) + # Revert outside the committed transaction so a transient broker + # outage degrades to "retried next tick" rather than "stuck DISPATCHED". + NotificationBuffer.objects.filter(id__in=buffer_ids).update( + status=BufferStatus.PENDING.value, + dispatched_at=None, + ) + + +def _dispatch_group( + org_id: Any, + webhook_url: str, + auth_sig: str, + platform: str, +) -> tuple[int, int]: + """Dispatch a single (org, url, auth_sig, platform) group; returns (rows, succeeded). + + Caller already filtered groups to MIN(flush_after) <= now. Locks rows + with SKIP LOCKED so a sibling replica skips them rather than blocking. + Re-fetches the source Notification each time for live auth (record may + have been edited between enqueue and flush). + """ + with transaction.atomic(): + rows = list( + NotificationBuffer.objects.select_for_update(skip_locked=True) + .select_related("notification") + .filter( + status=BufferStatus.PENDING.value, + organization_id=org_id, + webhook_url=webhook_url, + auth_sig=auth_sig, + platform=platform, + notification__is_active=True, + ) + .order_by("created_at")[:_PROCESS_BUFFER_CAP] + ) + if not rows: + # Either another replica claimed the rows (SKIP LOCKED) or they + # transitioned out of PENDING between the GROUP BY scan and the + # row-level lock. Either way: nothing to do here. + return 0, 0 + + # Live auth — read from the FIRST row's notification. If multiple + # notifications collide on (url, auth_sig, platform) we have, by + # definition, identical auth + format, so this is safe. Retry budget + # is the MAX across rows: there's a single HTTP call per batch, so + # the most retry-tolerant subscriber's intent wins; using the first + # row's value would silently truncate everyone else's retry budget. + first_notification = rows[0].notification + payloads = [r.payload for r in rows] + body = render_clubbed_message(payloads, platform) + headers = build_webhook_headers(first_notification) + buffer_ids = [str(r.id) for r in rows] + max_retries = max(r.notification.max_retries for r in rows) + + # Mark DISPATCHED first; if commit succeeds the on_commit hook + # publishes the broker task. If commit fails, rows stay PENDING and + # no task is published — eliminates the broker-vs-DB duplicate-send + # race that bit us when the order was reversed. + now = timezone.now() + NotificationBuffer.objects.filter(id__in=buffer_ids).update( + status=BufferStatus.DISPATCHED.value, + dispatched_at=now, + ) + transaction.on_commit( + lambda: _send_clubbed( + url=first_notification.url, + body=body, + headers=headers, + platform=platform, + max_retries=max_retries, + buffer_ids=buffer_ids, + org_id=org_id, + ) + ) + return len(rows), len(rows) + + +# Per-group cap; matches the renderer's MAX_BATCH_SIZE so the rendered +# events list and the dispatched row set stay in lock-step. Anything beyond +# this rolls into the next flush tick. +_PROCESS_BUFFER_CAP = 500 + + +@csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only +@require_http_methods(["POST"]) +def process_notification_buffer(request: HttpRequest) -> JsonResponse: + """Flush PENDING groups that have hit their flush_after; then GC. + + Algorithm: + 1. GROUP BY (org, url, auth_sig, platform), HAVING MIN(flush_after) <= NOW() + 2. For each group, in its own transaction: lock-skip-locked rows, + render, mark rows DISPATCHED, on_commit-dispatch a single Celery task. + 3. Sweep terminal rows older than NOTIFICATION_BUFFER_RETENTION_DAYS. + + Concurrency: SELECT FOR UPDATE SKIP LOCKED makes parallel calls safe — + each replica skips groups another worker is already dispatching. + """ + now = timezone.now() + groups = list( + NotificationBuffer.objects.filter(status=BufferStatus.PENDING.value) + .values("organization_id", "webhook_url", "auth_sig", "platform") + .annotate(earliest_flush=Min("flush_after")) + .filter(earliest_flush__lte=now) + ) + + dispatched_groups = 0 + dispatched_rows = 0 + for group in groups: + try: + rows, _succeeded = _dispatch_group( + org_id=group["organization_id"], + webhook_url=group["webhook_url"], + auth_sig=group["auth_sig"], + platform=group["platform"], + ) + except Exception: + logger.exception( + "Failed dispatching group org=%s url_hash=%s", + group["organization_id"], + webhook_url_hash(group["webhook_url"]), + ) + continue + if rows > 0: + dispatched_groups += 1 + dispatched_rows += rows + + gc_deleted = _gc_terminal_rows() + return JsonResponse( + { + "status": "success", + "dispatched_groups": dispatched_groups, + "dispatched_rows": dispatched_rows, + # DEAD_LETTER transitions are async (Celery link_error) — this + # response only covers transitions visible to this request. + "dead_letter_rows": 0, + "gc_deleted_rows": gc_deleted, + } + ) diff --git a/backend/notification_v2/internal_urls.py b/backend/notification_v2/internal_urls.py index 0414761089..a0f87f0250 100644 --- a/backend/notification_v2/internal_urls.py +++ b/backend/notification_v2/internal_urls.py @@ -21,6 +21,17 @@ router.register(r"", WebhookInternalViewSet, basename="webhook-internal") urlpatterns = [ + # Buffered (clubbed) notification dispatch endpoints + path( + "buffer/enqueue/", + internal_api_views.enqueue_notification_buffer, + name="enqueue_notification_buffer", + ), + path( + "buffer/process/", + internal_api_views.process_notification_buffer, + name="process_notification_buffer", + ), # Notification data endpoints for workers path( "pipeline//notifications/", diff --git a/backend/notification_v2/migrations/0002_notification_notify_on_failures.py b/backend/notification_v2/migrations/0002_notification_notify_on_failures.py new file mode 100644 index 0000000000..ce0c3535b5 --- /dev/null +++ b/backend/notification_v2/migrations/0002_notification_notify_on_failures.py @@ -0,0 +1,23 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("notification_v2", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="notification", + name="notify_on_failures", + field=models.BooleanField( + default=False, + db_comment=( + "When True, fire only on failed runs — terminal status " + "ERROR/STOPPED or any file in the run errored (partial " + "failure). When False (default), fire on every terminal " + "completion." + ), + ), + ), + ] diff --git a/backend/notification_v2/migrations/0003_add_notification_buffer.py b/backend/notification_v2/migrations/0003_add_notification_buffer.py new file mode 100644 index 0000000000..5090eff18a --- /dev/null +++ b/backend/notification_v2/migrations/0003_add_notification_buffer.py @@ -0,0 +1,154 @@ +import uuid + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("account_v2", "0001_initial"), + ("notification_v2", "0002_notification_notify_on_failures"), + ] + + operations = [ + migrations.AddField( + model_name="notification", + name="delivery_mode", + field=models.CharField( + choices=[("IMMEDIATE", "Immediate"), ("BATCHED", "Batched")], + default="BATCHED", + max_length=16, + db_comment=( + "BATCHED (default) buffers events and dispatches a single " + "clubbed message per (org, webhook_url, auth_sig) every " + "NOTIFICATION_CLUB_INTERVAL. IMMEDIATE fires on every " + "completion." + ), + ), + ), + migrations.CreateModel( + name="NotificationBuffer", + fields=[ + ("created_at", models.DateTimeField(auto_now_add=True)), + ("modified_at", models.DateTimeField(auto_now=True)), + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ( + "webhook_url", + models.URLField( + db_comment="Denormalized destination URL; grouping key.", + ), + ), + ( + "payload", + models.JSONField( + db_comment=( + "Pre-structured execution data (execution_id, status, " + "error_message, pipeline_name, pipeline_type) — NOT a " + "final rendered message. The renderer formats this at " + "dispatch time." + ), + ), + ), + ( + "platform", + models.CharField( + choices=[("SLACK", "Slack"), ("API", "Api")], + max_length=50, + db_comment=( + "SLACK / API — drives renderer selection at flush time." + ), + ), + ), + ( + "auth_sig", + models.CharField( + max_length=64, + db_comment=( + "SHA-256 hex of (auth_type + auth_key + auth_header), " + "computed at enqueue time. Grouping key — never store " + "raw credentials here." + ), + ), + ), + ( + "flush_after", + models.DateTimeField( + db_comment=( + "created_at + NOTIFICATION_CLUB_INTERVAL, precomputed " + "at enqueue. Read-at-enqueue contract: changing the " + "env var only affects rows enqueued after the restart." + ), + ), + ), + ("dispatched_at", models.DateTimeField(blank=True, null=True)), + ( + "status", + models.CharField( + choices=[ + ("PENDING", "Pending"), + ("DISPATCHED", "Dispatched"), + ("DEAD_LETTER", "Dead letter"), + ], + default="PENDING", + max_length=16, + db_comment=( + "PENDING -> DISPATCHED on success, " + "PENDING -> DEAD_LETTER on retry exhaustion." + ), + ), + ), + ( + "notification", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="buffer_rows", + to="notification_v2.notification", + db_comment=( + "Source Notification. Cascade-delete is intentional: " + "removing a Notification expresses intent to stop all " + "future deliveries, including buffered ones." + ), + ), + ), + ( + "organization", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="notification_buffer_rows", + to="account_v2.organization", + db_comment=( + "Tenant scope. Mandatory grouping key — prevents " + "cross-tenant leakage at flush time." + ), + ), + ), + ], + options={ + "verbose_name": "Notification Buffer", + "verbose_name_plural": "Notification Buffers", + "db_table": "notification_buffer", + }, + ), + migrations.AddIndex( + model_name="notificationbuffer", + index=models.Index( + condition=models.Q(("status", "PENDING")), + fields=[ + "organization", + "webhook_url", + "auth_sig", + "platform", + "flush_after", + ], + name="idx_notif_buffer_pending", + ), + ), + ] diff --git a/backend/notification_v2/models.py b/backend/notification_v2/models.py index 489a8c827e..ea8b68f4ad 100644 --- a/backend/notification_v2/models.py +++ b/backend/notification_v2/models.py @@ -1,13 +1,21 @@ import uuid +from account_v2.models import Organization from api_v2.models import APIDeployment from django.db import models from pipeline_v2.models import Pipeline from utils.models.base_model import BaseModel -from .enums import AuthorizationType, NotificationType, PlatformType +from .enums import ( + AuthorizationType, + BufferStatus, + DeliveryMode, + NotificationType, + PlatformType, +) NOTIFICATION_NAME_MAX_LENGTH = 255 +AUTH_SIG_LENGTH = 64 # SHA-256 hex digest class Notification(BaseModel): @@ -47,6 +55,24 @@ class Notification(BaseModel): default=True, db_comment="Flag indicating whether the notification is active or not.", ) + notify_on_failures = models.BooleanField( + default=False, + db_comment=( + "When True, fire only on failed runs — terminal status ERROR/STOPPED " + "or any file in the run errored (partial failure). When False " + "(default), fire on every terminal completion." + ), + ) + delivery_mode = models.CharField( + max_length=16, + choices=DeliveryMode.choices(), + default=DeliveryMode.BATCHED.value, + db_comment=( + "BATCHED (default) buffers events and dispatches a single clubbed " + "message per (org, webhook_url, auth_sig) every " + "NOTIFICATION_CLUB_INTERVAL. IMMEDIATE fires on every completion." + ), + ) # Foreign keys to specific models pipeline = models.ForeignKey( Pipeline, @@ -92,3 +118,100 @@ def __str__(self): f"Notification {self.id}: (Type: {self.notification_type}, " f"Platform: {self.platform}, Url: {self.url}))" ) + + +class NotificationBuffer(BaseModel): + """Per-execution event buffered for a BATCHED notification. + + One row is written per workflow completion when the source Notification + has delivery_mode=BATCHED. The flush job groups rows by + (organization, webhook_url, auth_sig), renders one clubbed message per + group, and dispatches via the existing send_webhook_notification Celery + task. Group key includes auth_sig because two notifications may share the + same URL but use different credentials — they must dispatch separately. + """ + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + notification = models.ForeignKey( + Notification, + on_delete=models.CASCADE, + related_name="buffer_rows", + db_comment=( + "Source Notification. Cascade-delete is intentional: removing a " + "Notification expresses intent to stop all future deliveries, " + "including buffered ones." + ), + ) + organization = models.ForeignKey( + Organization, + on_delete=models.CASCADE, + related_name="notification_buffer_rows", + db_comment=( + "Tenant scope. Mandatory grouping key — prevents cross-tenant " + "leakage at flush time." + ), + ) + webhook_url = models.URLField( + db_comment="Denormalized destination URL; grouping key.", + ) + payload = models.JSONField( + db_comment=( + "Pre-structured execution data (execution_id, status, error_message, " + "pipeline_name, pipeline_type) — NOT a final rendered message. The " + "renderer formats this at dispatch time." + ), + ) + platform = models.CharField( + max_length=50, + choices=PlatformType.choices(), + db_comment="SLACK / API — drives renderer selection at flush time.", + ) + auth_sig = models.CharField( + max_length=AUTH_SIG_LENGTH, + db_comment=( + "SHA-256 hex of (auth_type + auth_key + auth_header), computed at " + "enqueue time. Grouping key — never store raw credentials here." + ), + ) + flush_after = models.DateTimeField( + db_comment=( + "created_at + NOTIFICATION_CLUB_INTERVAL, precomputed at enqueue. " + "Read-at-enqueue contract: changing the env var only affects rows " + "enqueued after the restart." + ), + ) + dispatched_at = models.DateTimeField(null=True, blank=True) + status = models.CharField( + max_length=16, + choices=BufferStatus.choices(), + default=BufferStatus.PENDING.value, + db_comment="PENDING -> DISPATCHED on success, PENDING -> DEAD_LETTER on retry exhaustion.", + ) + + class Meta: + verbose_name = "Notification Buffer" + verbose_name_plural = "Notification Buffers" + db_table = "notification_buffer" + indexes = [ + # Partial covering index — supports Index Only Scans on the flush + # GROUP BY query and bounds index size to live PENDING backlog. + # `platform` is part of the grouping key so SLACK and API rows on + # the same (org, url, auth) split into separate dispatches. + models.Index( + fields=[ + "organization", + "webhook_url", + "auth_sig", + "platform", + "flush_after", + ], + name="idx_notif_buffer_pending", + condition=models.Q(status=BufferStatus.PENDING.value), + ), + ] + + def __str__(self) -> str: + return ( + f"NotificationBuffer {self.id}: status={self.status} " + f"flush_after={self.flush_after.isoformat() if self.flush_after else 'n/a'}" + ) diff --git a/backend/notification_v2/provider/webhook/api_webhook.py b/backend/notification_v2/provider/webhook/api_webhook.py index 9f1264b9e7..9864d84a4a 100644 --- a/backend/notification_v2/provider/webhook/api_webhook.py +++ b/backend/notification_v2/provider/webhook/api_webhook.py @@ -1,13 +1,30 @@ +from typing import Any + +from notification_v2.clubbed_renderer import build_envelope from notification_v2.provider.webhook.webhook import Webhook class APIWebhook(Webhook): - def send(self): - """Send the API webhook notification.""" + def send(self) -> None: + """Send the API webhook notification. + + Wraps the IMMEDIATE event in the canonical envelope before queueing + so the receiver-visible JSON shape matches BATCHED dispatches — + `{"summary": {...}, "events": [{...}]}`. + """ + self.payload = self.format_payload() super().send() - def get_headers(self): + def get_headers(self) -> dict[str, str]: """API-specific headers.""" headers = super().get_headers() headers["Content-Type"] = "application/json" return headers + + def format_payload(self) -> dict[str, Any]: + """Wrap a single IMMEDIATE event in the canonical envelope. + + Receivers parse the same `{summary, events}` shape regardless of + whether the dispatch was IMMEDIATE or BATCHED. + """ + return build_envelope(payloads=[self.payload]) diff --git a/backend/notification_v2/provider/webhook/slack_webhook.py b/backend/notification_v2/provider/webhook/slack_webhook.py index 45e260b680..0fda635d74 100644 --- a/backend/notification_v2/provider/webhook/slack_webhook.py +++ b/backend/notification_v2/provider/webhook/slack_webhook.py @@ -1,60 +1,33 @@ import logging +from typing import Any +from notification_v2.clubbed_renderer import render_clubbed_message +from notification_v2.enums import PlatformType from notification_v2.provider.webhook.webhook import Webhook logger = logging.getLogger(__name__) class SlackWebhook(Webhook): - def send(self): + def send(self) -> None: """Send the Slack webhook notification.""" formatted_payload = self.format_payload() self.payload = formatted_payload super().send() - def get_headers(self): + def get_headers(self) -> dict[str, str]: """Slack-specific headers.""" headers = super().get_headers() headers["Content-Type"] = "application/json" return headers - def format_payload(self) -> dict: - """Format the payload to match Slack's expected structure.""" - if "text" not in self.payload: - # Construct a basic Slack message with 'text' field - formatted_payload = { - "text": "Notification", - "blocks": self.create_blocks_from_payload(), - } - else: - # If 'text' is already present, format accordingly - formatted_payload = { - "text": self.payload.pop("text"), - "blocks": self.create_blocks_from_payload(), - } - return formatted_payload + def format_payload(self) -> dict[str, Any]: + """Render the IMMEDIATE event through the canonical envelope. - def create_blocks_from_payload(self) -> list: - """Create Slack blocks from the given payload.""" - blocks = [] - # Header - blocks.append( - { - "type": "section", - "text": {"type": "mrkdwn", "text": "*Unstract Update:*"}, - } + Single shared renderer for IMMEDIATE and BATCHED so receivers see the + same Slack body shape regardless of delivery mode. + """ + return render_clubbed_message( + payloads=[self.payload], + platform=PlatformType.SLACK.value, ) - # Add a divider for separation - blocks.append({"type": "divider"}) - # Add each key-value pair to the blocks - for key, value in self.payload.items(): - formatted_key = key.replace("_", " ").title() - blocks.append( - { - "type": "section", - "text": {"type": "mrkdwn", "text": f"*{formatted_key}:* {value}"}, - } - ) - # Footer - blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": "*---*"}}) - return blocks diff --git a/backend/notification_v2/provider/webhook/webhook.py b/backend/notification_v2/provider/webhook/webhook.py index 549336a5f9..40ddc53e61 100644 --- a/backend/notification_v2/provider/webhook/webhook.py +++ b/backend/notification_v2/provider/webhook/webhook.py @@ -17,7 +17,7 @@ class HeaderConstants: class Webhook(NotificationProvider): - def send(self): + def send(self) -> None: """Send the webhook notification.""" try: headers = self.get_headers() @@ -51,7 +51,7 @@ def validate(self): raise ValueError("Payload is required.") return super().validate() - def get_headers(self): + def get_headers(self) -> dict[str, str]: """Get the headers for the notification based on the authorization type and key. Raises: @@ -60,7 +60,7 @@ def get_headers(self): Returns: dict[str, str]: A dictionary containing the headers. """ - headers = {} + headers: dict[str, str] = {} try: authorization_type = AuthorizationType( self.notification.authorization_type.upper() diff --git a/backend/notification_v2/serializers.py b/backend/notification_v2/serializers.py index 115487c481..79802be90b 100644 --- a/backend/notification_v2/serializers.py +++ b/backend/notification_v2/serializers.py @@ -5,6 +5,14 @@ from .models import Notification +class NotificationSettingsSerializer(serializers.Serializer): + """Org-scoped notification batching settings (UNS-611 v2).""" + + # Bounds (1 min – 2 h) mirror ConfigKey.NOTIFICATION_CLUB_INTERVAL so DRF + # returns a structured 400 before ConfigKey.cast_value re-raises. + club_interval_seconds = serializers.IntegerField(min_value=60, max_value=7200) + + class NotificationSerializer(serializers.ModelSerializer): notification_type = serializers.ChoiceField(choices=NotificationType.choices()) authorization_type = serializers.ChoiceField(choices=AuthorizationType.choices()) @@ -12,6 +20,7 @@ class NotificationSerializer(serializers.ModelSerializer): max_retries = serializers.IntegerField( max_value=4, min_value=0, default=0, required=False ) + notify_on_failures = serializers.BooleanField(default=False, required=False) class Meta: model = Notification diff --git a/backend/notification_v2/tasks.py b/backend/notification_v2/tasks.py new file mode 100644 index 0000000000..f14a07ec55 --- /dev/null +++ b/backend/notification_v2/tasks.py @@ -0,0 +1,51 @@ +"""Celery tasks owned by notification_v2. + +Currently hosts ``mark_buffer_dead_letter`` — a thin task attached as a +Celery ``link_error`` to the clubbed dispatch chain. When the underlying +``send_webhook_notification`` task exhausts retries, this task converts +the buffered rows from PENDING/DISPATCHED to terminal DEAD_LETTER so the +flush job will not re-pick them. +""" + +from __future__ import annotations + +import logging +from collections.abc import Iterable +from typing import Any + +from backend.celery_service import app as celery_app +from notification_v2.enums import BufferStatus +from notification_v2.models import NotificationBuffer + +logger = logging.getLogger(__name__) + + +@celery_app.task(name="notification_v2.mark_buffer_dead_letter") +def mark_buffer_dead_letter( + request: Any, + exc: Any = None, + traceback: Any = None, + *, + buffer_row_ids: Iterable[str] | None = None, +) -> int: + """Mark a clubbed dispatch's rows as DEAD_LETTER on terminal failure. + + Celery's ``link_error`` signature passes ``(request, exc, traceback)`` to + the callback; the actual buffer ids are bound at dispatch time via task + kwargs. Returns the row count for visibility in flower. + """ + if not buffer_row_ids: + logger.warning( + "mark_buffer_dead_letter invoked without buffer_row_ids — nothing to do" + ) + return 0 + ids = list(buffer_row_ids) + updated: int = NotificationBuffer.objects.filter(id__in=ids).update( + status=BufferStatus.DEAD_LETTER.value + ) + logger.warning( + "metric=notification_batch_dispatched_total result=dead_letter rows=%d exc=%r", + updated, + exc, + ) + return updated diff --git a/backend/notification_v2/urls.py b/backend/notification_v2/urls.py index 2e356b4003..e41bb00f3a 100644 --- a/backend/notification_v2/urls.py +++ b/backend/notification_v2/urls.py @@ -1,7 +1,7 @@ from django.urls import path from rest_framework.urlpatterns import format_suffix_patterns -from .views import NotificationViewSet +from .views import NotificationSettingsView, NotificationViewSet notification_list = NotificationViewSet.as_view({"get": "list", "post": "create"}) notification_detail = NotificationViewSet.as_view( @@ -16,6 +16,13 @@ urlpatterns = format_suffix_patterns( [ path("", notification_list, name="notification-list"), + # Org-scoped notification batching settings (UNS-611 v2). Mounted + # before the route so "settings" is not interpreted as a UUID. + path( + "settings/", + NotificationSettingsView.as_view(), + name="notification-settings", + ), path("/", notification_detail, name="notification-detail"), path( "pipeline//", diff --git a/backend/notification_v2/views.py b/backend/notification_v2/views.py index 1410256ab6..207d4acc6b 100644 --- a/backend/notification_v2/views.py +++ b/backend/notification_v2/views.py @@ -1,14 +1,26 @@ +import logging + from api_v2.deployment_helper import DeploymentHelper from api_v2.exceptions import APINotFound +from configuration.enums import ConfigKey +from configuration.models import Configuration from pipeline_v2.exceptions import PipelineNotFound from pipeline_v2.models import Pipeline from pipeline_v2.pipeline_processor import PipelineProcessor -from rest_framework import viewsets +from platform_api.permissions import IsOrganizationAdmin +from rest_framework import status, viewsets +from rest_framework.permissions import IsAuthenticated +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.views import APIView +from utils.user_context import UserContext from notification_v2.constants import NotificationUrlConstant from .models import Notification -from .serializers import NotificationSerializer +from .serializers import NotificationSerializer, NotificationSettingsSerializer + +logger = logging.getLogger(__name__) class NotificationViewSet(viewsets.ModelViewSet): @@ -39,3 +51,47 @@ def get_queryset(self): queryset = queryset.filter(api=api) return queryset + + +class NotificationSettingsView(APIView): + """Org-scoped notification batching settings — currently just the club interval. + + GET returns the org's effective interval (override or env-derived default). + PATCH writes/updates the override via the generic configuration KV table. + + Read-at-enqueue contract (mfbt §EC-2 / §EC-8): updates take effect for + notifications enqueued after the change. Existing PENDING buffer rows + keep their original flush_after. + """ + + permission_classes = [IsAuthenticated, IsOrganizationAdmin] + + def get(self, request: Request) -> Response: + organization = UserContext.get_organization() + value = Configuration.get_value_by_organization( + ConfigKey.NOTIFICATION_CLUB_INTERVAL, organization + ) + return Response({"club_interval_seconds": int(value)}) + + def patch(self, request: Request) -> Response: + serializer = NotificationSettingsSerializer(data=request.data, partial=True) + serializer.is_valid(raise_exception=True) + organization = UserContext.get_organization() + new_value = serializer.validated_data.get("club_interval_seconds") + if new_value is None: + return Response( + {"detail": "club_interval_seconds is required."}, + status=status.HTTP_400_BAD_REQUEST, + ) + # ConfigKey.cast_value enforces type + any future bounds; bubble its + # ValueError up as a 400 instead of letting it 500. + try: + ConfigKey.NOTIFICATION_CLUB_INTERVAL.cast_value(new_value) + except ValueError as exc: + return Response({"detail": str(exc)}, status=status.HTTP_400_BAD_REQUEST) + Configuration.objects.update_or_create( + organization=organization, + key=ConfigKey.NOTIFICATION_CLUB_INTERVAL.name, + defaults={"value": str(new_value), "enabled": True}, + ) + return Response({"club_interval_seconds": int(new_value)}) diff --git a/backend/pipeline_v2/dto.py b/backend/pipeline_v2/dto.py index 5a87ba0825..d0f27e7943 100644 --- a/backend/pipeline_v2/dto.py +++ b/backend/pipeline_v2/dto.py @@ -10,6 +10,9 @@ def __init__( status: str, execution_id: str | None = None, error_message: str | None = None, + total_files: int | None = None, + successful_files: int | None = None, + failed_files: int | None = None, ): self.type = type self.pipeline_id = pipeline_id @@ -17,14 +20,26 @@ def __init__( self.status = status self.execution_id = execution_id self.error_message = error_message + self.total_files = total_files + self.successful_files = successful_files + self.failed_files = failed_files def to_dict(self) -> dict[str, Any]: - """Convert the payload DTO to a dictionary.""" - payload = { + """Convert the payload DTO to a dictionary. + + File counts are nested in `additional_data` to match the worker-path + payload shape (NotificationPayload.from_execution_status). + """ + payload: dict[str, Any] = { "type": self.type, "pipeline_id": str(self.pipeline_id), "pipeline_name": self.pipeline_name, "status": self.status, + "additional_data": { + "total_files": self.total_files or 0, + "successful_files": self.successful_files or 0, + "failed_files": self.failed_files or 0, + }, } if self.execution_id: payload["execution_id"] = str(self.execution_id) diff --git a/backend/pipeline_v2/notification.py b/backend/pipeline_v2/notification.py index dbfc0dea52..9537cad47b 100644 --- a/backend/pipeline_v2/notification.py +++ b/backend/pipeline_v2/notification.py @@ -1,7 +1,9 @@ import logging -from notification_v2.helper import NotificationHelper +from notification_v2.enums import FAILURE_STATUSES +from notification_v2.helper import dispatch_with_delivery_mode from notification_v2.models import Notification +from workflow_manager.workflow_v2.models.execution import WorkflowExecution from pipeline_v2.dto import PipelineStatusPayload from pipeline_v2.models import Pipeline @@ -23,11 +25,57 @@ def __init__( self.error_message = error_message self.execution_id = execution_id - def send(self): - if not self.notifications.count(): - logger.info(f"No notifications found for pipeline {self.pipeline}") + def _load_execution(self) -> WorkflowExecution | None: + """Load the WorkflowExecution row for this dispatch, if available. + + Falls back to None when no execution_id was supplied (e.g. legacy + callers); callers must handle the None case. + """ + if not self.execution_id: + return None + try: + return WorkflowExecution.objects.get(id=self.execution_id) + except WorkflowExecution.DoesNotExist: + logger.warning( + "WorkflowExecution %s not found for pipeline notification", + self.execution_id, + ) + return None + + def send(self) -> None: + execution = self._load_execution() + # Source of truth for partial-failure detection is the per-run aggregate + # written by the worker callback. Pipeline.last_run_status is a coarse + # collapse (ERROR/STOPPED → FAILURE) that hides per-file errors when + # at least one file succeeded. + failed_files = (execution.failed_files or 0) if execution else 0 + execution_status = execution.status if execution else None + is_failure = ( + execution_status in FAILURE_STATUSES + or failed_files > 0 + or self.pipeline.last_run_status == Pipeline.PipelineStatus.FAILURE + ) + if not is_failure: + self.notifications = self.notifications.filter(notify_on_failures=False) + + if not self.notifications.exists(): + logger.info( + "No notifications to dispatch for pipeline %s (status=%s, failed_files=%s)", + self.pipeline, + self.pipeline.last_run_status, + failed_files, + ) return - logger.info(f"Sending pipeline status notification for pipeline {self.pipeline}") + successful_files = (execution.successful_files or 0) if execution else 0 + total_files = execution.total_files if execution else None + logger.info( + "Sending pipeline status notification for pipeline %s " + "(status=%s, successful=%s, failed=%s)", + self.pipeline, + self.pipeline.last_run_status, + successful_files, + failed_files, + ) payload_dto = PipelineStatusPayload( type=self.pipeline.pipeline_type, pipeline_id=str(self.pipeline.id), @@ -35,8 +83,12 @@ def send(self): status=self.pipeline.last_run_status, execution_id=self.execution_id, error_message=self.error_message, + total_files=total_files, + successful_files=successful_files, + failed_files=failed_files, ) - - NotificationHelper.send_notification( - notifications=self.notifications, payload=payload_dto.to_dict() + dispatch_with_delivery_mode( + list(self.notifications), + payload_dto.to_dict(), + error_context=f"pipeline={self.pipeline.id}", ) diff --git a/backend/workflow_manager/internal_serializers.py b/backend/workflow_manager/internal_serializers.py index bed98c6853..506d389ef8 100644 --- a/backend/workflow_manager/internal_serializers.py +++ b/backend/workflow_manager/internal_serializers.py @@ -178,9 +178,44 @@ class WorkflowExecutionStatusUpdateSerializer(serializers.Serializer): total_files = serializers.IntegerField( required=False, min_value=0 ) # Allow 0 but backend will only update if > 0 + successful_files = serializers.IntegerField(required=False, min_value=0) + failed_files = serializers.IntegerField(required=False, min_value=0) attempts = serializers.IntegerField(required=False, min_value=0) execution_time = serializers.FloatField(required=False, min_value=0) + def validate(self, attrs): + """Reject impossible file-count aggregates. + + Per-field min_value=0 catches negatives, but successful + failed > + total or either component > total slips through and skews the + outcome-based notification filter downstream. + """ + total = attrs.get("total_files") + successful = attrs.get("successful_files") + failed = attrs.get("failed_files") + + if total is None: + if successful is not None or failed is not None: + raise serializers.ValidationError( + { + "total_files": "total_files is required when file aggregates are provided." + } + ) + return attrs + + if successful is not None and successful > total: + raise serializers.ValidationError( + {"successful_files": "successful_files cannot exceed total_files."} + ) + if failed is not None and failed > total: + raise serializers.ValidationError( + {"failed_files": "failed_files cannot exceed total_files."} + ) + if successful is not None and failed is not None and successful + failed > total: + msg = "successful_files + failed_files cannot exceed total_files." + raise serializers.ValidationError({"non_field_errors": msg}) + return attrs + class OrganizationContextSerializer(serializers.Serializer): """Serializer for organization context information.""" diff --git a/backend/workflow_manager/internal_views.py b/backend/workflow_manager/internal_views.py index c822e5e7b5..52f100196a 100644 --- a/backend/workflow_manager/internal_views.py +++ b/backend/workflow_manager/internal_views.py @@ -513,10 +513,19 @@ def update_status(self, request, id=None): increment_attempt=increment_attempt, ) - # Update total_files separately (not handled by update_execution) + # Update total_files / per-file aggregates separately (not handled by update_execution) + update_fields: list[str] = [] if validated_data.get("total_files") is not None: execution.total_files = validated_data["total_files"] - execution.save() + update_fields.append("total_files") + if validated_data.get("successful_files") is not None: + execution.successful_files = validated_data["successful_files"] + update_fields.append("successful_files") + if validated_data.get("failed_files") is not None: + execution.failed_files = validated_data["failed_files"] + update_fields.append("failed_files") + if update_fields: + execution.save(update_fields=update_fields) logger.info( f"Updated workflow execution {id} status to {validated_data['status']}" diff --git a/backend/workflow_manager/workflow_v2/migrations/0020_workflowexecution_file_counts.py b/backend/workflow_manager/workflow_v2/migrations/0020_workflowexecution_file_counts.py new file mode 100644 index 0000000000..6dba25f5be --- /dev/null +++ b/backend/workflow_manager/workflow_v2/migrations/0020_workflowexecution_file_counts.py @@ -0,0 +1,36 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("workflow_v2", "0019_remove_filehistory_trigram_index"), + ] + + operations = [ + migrations.AddField( + model_name="workflowexecution", + name="successful_files", + field=models.PositiveIntegerField( + blank=True, + null=True, + db_comment=( + "Per-run aggregate of files that completed successfully. " + "Written by the worker callback at terminal state. Null on " + "rows created before this column was added." + ), + ), + ), + migrations.AddField( + model_name="workflowexecution", + name="failed_files", + field=models.PositiveIntegerField( + blank=True, + null=True, + db_comment=( + "Per-run aggregate of files that errored. Written by the " + "worker callback at terminal state. Null on rows created " + "before this column was added." + ), + ), + ), + ] diff --git a/backend/workflow_manager/workflow_v2/models/execution.py b/backend/workflow_manager/workflow_v2/models/execution.py index 45886bd64e..4e3656b68d 100644 --- a/backend/workflow_manager/workflow_v2/models/execution.py +++ b/backend/workflow_manager/workflow_v2/models/execution.py @@ -16,7 +16,6 @@ from workflow_manager.execution.dto import ExecutionCache from workflow_manager.execution.execution_cache_utils import ExecutionCacheUtils -from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.enums import ExecutionStatus from workflow_manager.workflow_v2.models import Workflow @@ -174,6 +173,24 @@ class Type(models.TextChoices): total_files = models.PositiveIntegerField( default=0, verbose_name="Total files", db_comment="Number of files to process" ) + successful_files = models.PositiveIntegerField( + null=True, + blank=True, + db_comment=( + "Per-run aggregate of files that completed successfully. Written by " + "the worker callback at terminal state. Null on rows created before " + "this column was added." + ), + ) + failed_files = models.PositiveIntegerField( + null=True, + blank=True, + db_comment=( + "Per-run aggregate of files that errored. Written by the worker " + "callback at terminal state. Null on rows created before this " + "column was added." + ), + ) error_message = models.CharField( max_length=EXECUTION_ERROR_LENGTH, blank=True, @@ -420,16 +437,8 @@ def get_last_run_statuses(cls, pipeline_id: uuid.UUID, limit: int = 5) -> list[d result = [] for e in executions: - # TODO: Optimize by storing successful/failed counts directly in - # WorkflowExecution model. Current approach causes N+1 queries - # (2 queries per execution). Denormalized counts would eliminate - # these queries entirely. - successful = WorkflowFileExecution.objects.filter( - workflow_execution_id=e.id, status="COMPLETED" - ).count() - failed = WorkflowFileExecution.objects.filter( - workflow_execution_id=e.id, status="ERROR" - ).count() + successful = e.successful_files or 0 + failed = e.failed_files or 0 # Compute display_status: PARTIAL_SUCCESS if completed with mixed results display_status = e.status diff --git a/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx b/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx index 9c207bd1a9..d9577f9c39 100644 --- a/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx +++ b/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx @@ -1,4 +1,4 @@ -import { Button, Form, Input, Select, Space } from "antd"; +import { Button, Checkbox, Form, Input, Select, Space } from "antd"; import PropTypes from "prop-types"; import { useEffect, useState } from "react"; import { getBackendErrorDetail } from "../../../helpers/GetStaticData"; @@ -12,6 +12,7 @@ const DEFAULT_FORM_DETAILS = { authorization_key: "", is_active: false, max_retries: 0, + notify_on_failures: false, pipeline: "", api: "", url: "", @@ -83,7 +84,8 @@ function CreateNotification({ }, [formDetails]); const handleInputChange = (changedValues, allValues) => { - setFormDetails({ ...formDetails, ...allValues }); + const nextValues = { ...formDetails, ...allValues }; + setFormDetails(nextValues); const changedFieldName = Object.keys(changedValues)[0]; form.setFields([ { @@ -220,6 +222,17 @@ function CreateNotification({ ), )} + + + + Notify on failures only + + + diff --git a/frontend/src/components/pipelines-or-deployments/notification-modal/DisplayNotifications.jsx b/frontend/src/components/pipelines-or-deployments/notification-modal/DisplayNotifications.jsx index 9754dd20ab..6860bf0258 100644 --- a/frontend/src/components/pipelines-or-deployments/notification-modal/DisplayNotifications.jsx +++ b/frontend/src/components/pipelines-or-deployments/notification-modal/DisplayNotifications.jsx @@ -91,7 +91,7 @@ function DisplayNotifications({ indicator: , spinning: isLoading, }} - pagination={{ pageSize: 5 }} + pagination={false} /> ); diff --git a/frontend/src/components/settings/platform/PlatformSettings.css b/frontend/src/components/settings/platform/PlatformSettings.css index 4ad6f4bf8e..da0da79318 100644 --- a/frontend/src/components/settings/platform/PlatformSettings.css +++ b/frontend/src/components/settings/platform/PlatformSettings.css @@ -2,7 +2,7 @@ .plt-set-layout { height: calc(100vh - 60px); - background-color: #ffffff; + background-color: var(--page-bg-3); } .plt-set-layout-2 { @@ -10,9 +10,10 @@ } .plt-set-head { - background-color: #f5f7f9; + background-color: var(--white); padding: 14px; height: 60px; + border-bottom: 1px solid var(--border-color-1); } .plt-set-head-typo { @@ -20,14 +21,56 @@ font-weight: 600; } +.plt-set-section { + margin-bottom: 16px; +} + +.plt-set-section > .ant-typography { + margin-top: 0; +} + +.plt-set-section-subtitle { + display: block; + margin-bottom: 8px; + font-size: 12px; +} + +.plt-set-inner-card { + background-color: #f5f7f9; + border: 1px solid #e5e7eb; + border-radius: 8px; + padding: 16px; +} + .plt-set-key-head { + display: flex; + align-items: center; + gap: 8px; margin-bottom: 8px; } -.plt-set-key-head-col-1 { - padding-right: 16px; +.plt-set-key-pill-clickable { + cursor: pointer; } .plt-set-key-display { width: 300px; } + +.plt-set-notif-field-label { + display: block; + margin-bottom: 8px; + font-size: 13px; +} + +.plt-set-notif-field-row { + display: flex; + align-items: center; + gap: 8px; +} + +.plt-set-notif-helper { + display: block; + margin-top: 6px; + font-size: 12px; +} diff --git a/frontend/src/components/settings/platform/PlatformSettings.jsx b/frontend/src/components/settings/platform/PlatformSettings.jsx index f255da6a73..88a4613e78 100644 --- a/frontend/src/components/settings/platform/PlatformSettings.jsx +++ b/frontend/src/components/settings/platform/PlatformSettings.jsx @@ -3,7 +3,16 @@ import { CopyOutlined, DeleteOutlined, } from "@ant-design/icons"; -import { Button, Col, Divider, Input, Radio, Row, Typography } from "antd"; +import { + Button, + Col, + Divider, + Input, + InputNumber, + Row, + Tag, + Typography, +} from "antd"; import { useEffect, useState } from "react"; import { useNavigate } from "react-router-dom"; @@ -38,6 +47,9 @@ function PlatformSettings() { const [keys, setKeys] = useState(defaultKeys); const [isLoadingIndex, setLoadingIndex] = useState(null); const [isDeletingIndex, setDeletingIndex] = useState(null); + // UI shows minutes; wire format (and ConfigSpec.value) is seconds. + const [batchIntervalMinutes, setBatchIntervalMinutes] = useState(null); + const [isSavingInterval, setIsSavingInterval] = useState(false); const { sessionDetails } = useSessionStore(); const { setAlertDetails } = useAlertStore(); const axiosPrivate = useAxiosPrivate(); @@ -45,6 +57,67 @@ function PlatformSettings() { const handleException = useExceptionHandler(); const { setPostHogCustomEvent } = usePostHogEvents(); + useEffect(() => { + // Wait for session hydration — without this guard the first render + // fires GET against /api/v1/unstract/undefined/... and silently 404s. + if (!sessionDetails?.orgId) { + return; + } + // Load org-scoped batch interval (UNS-611 v2). Falls back silently to + // null on failure so the rest of the page still renders. + axiosPrivate({ + method: "GET", + url: `/api/v1/unstract/${sessionDetails?.orgId}/notifications/settings/`, + }) + .then((res) => { + const seconds = res?.data?.club_interval_seconds; + if (typeof seconds === "number" && seconds > 0) { + setBatchIntervalMinutes(Math.round(seconds / 60)); + } + }) + .catch(() => { + // Non-fatal — admin just won't see a pre-filled value. + }); + }, [sessionDetails?.orgId]); + + const handleSaveInterval = () => { + if ( + !batchIntervalMinutes || + batchIntervalMinutes < 1 || + batchIntervalMinutes > 120 + ) { + setAlertDetails({ + type: "error", + content: "Notification interval must be between 1 and 120 minutes.", + }); + return; + } + setIsSavingInterval(true); + axiosPrivate({ + method: "PATCH", + url: `/api/v1/unstract/${sessionDetails?.orgId}/notifications/settings/`, + headers: { + "X-CSRFToken": sessionDetails?.csrfToken, + "Content-Type": "application/json", + }, + data: { club_interval_seconds: batchIntervalMinutes * 60 }, + }) + .then(() => { + setAlertDetails({ + type: "success", + content: "Notification batch interval updated.", + }); + }) + .catch((err) => { + setAlertDetails( + handleException(err, "Failed to update batch interval"), + ); + }) + .finally(() => { + setIsSavingInterval(false); + }); + }; + useEffect(() => { const requestOptions = { method: "GET", @@ -252,84 +325,133 @@ function PlatformSettings() {
-
- {keys.map((keyDetails, keyIndex) => { - return ( -
-
-
- - -
- - {keyDetails?.keyName} - -
- - -
- handleToggle(keyIndex)} - > - Active Key - -
- -
-
+
+ Internal API Keys + + Authenticate platform-to-platform requests. Keep these values + secret. + +
+ {keys.map((keyDetails, keyIndex) => { + const isActive = + Boolean(keyDetails?.id) && activeKey === keyIndex; + const canActivate = keyDetails?.id !== null; + return ( +
- - -
- - copyText(keys[keyIndex].key) - } - /> - } - /> -
- - - - - - handleDelete(keyIndex)} - content="Want to delete this platform key? This action cannot be undone." - okText="Delete" +
+ + {keyDetails?.keyName} + + {isActive ? ( + Active + ) : ( + handleToggle(keyIndex) + : undefined + } > + Inactive + + )} +
+
+ + +
+ + copyText(keys[keyIndex].key) + } + /> + } + /> +
+ + + + + handleDelete(keyIndex)} + content="Want to delete this platform key? This action cannot be undone." + okText="Delete" + > +
+ {keyIndex < keys?.length - 1 && }
- {keyIndex < keys?.length - 1 && } -
- ); - })} + ); + })} +
+
+
+ Notifications + + Control how often the platform notifies you about activity. + +
+ + Notification interval + +
+ setBatchIntervalMinutes(v)} + /> + +
+ + Allowed: 1 to 120 minutes. Default: 5 minutes. + +
diff --git a/unstract/core/src/unstract/core/data_models.py b/unstract/core/src/unstract/core/data_models.py index 7e8e984a04..7642aa9ce1 100644 --- a/unstract/core/src/unstract/core/data_models.py +++ b/unstract/core/src/unstract/core/data_models.py @@ -512,6 +512,8 @@ class NotificationPayload: # Metadata timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) + # Per-run file aggregates (total/successful/failed) are nested here + # so receivers see them grouped rather than as top-level keys. additional_data: dict[str, Any] = field(default_factory=dict) # Internal tracking (not sent to external webhooks) @@ -565,6 +567,9 @@ def from_execution_status( error_message: str | None = None, organization_id: str | None = None, additional_data: dict[str, Any] | None = None, + total_files: int = 0, + successful_files: int = 0, + failed_files: int = 0, ) -> "NotificationPayload": """Create notification payload from execution status. @@ -599,6 +604,17 @@ def from_execution_status( f"Cannot create notification for non-final status: {execution_status}" ) + # File counts are bundled inside additional_data so receivers see + # them grouped (e.g. Slack renders one "Additional Data" section). + # Caller-supplied additional_data takes precedence on key conflict. + merged_additional = { + "total_files": total_files, + "successful_files": successful_files, + "failed_files": failed_files, + } + if additional_data: + merged_additional.update(additional_data) + return cls( type=workflow_type, pipeline_id=pipeline_id, @@ -607,7 +623,7 @@ def from_execution_status( execution_id=execution_id, error_message=error_message, organization_id=organization_id, - additional_data=additional_data or {}, + additional_data=merged_additional, _source=source, ) diff --git a/unstract/core/src/unstract/core/notification_clubbed_renderer.py b/unstract/core/src/unstract/core/notification_clubbed_renderer.py new file mode 100644 index 0000000000..9d8ce709a9 --- /dev/null +++ b/unstract/core/src/unstract/core/notification_clubbed_renderer.py @@ -0,0 +1,202 @@ +"""Shared clubbed-notification envelope + Slack renderer. + +Imported by both `backend/notification_v2/clubbed_renderer.py` and the +worker `notification/providers/*_webhook.py` so the receiver-visible +payload (envelope JSON for API, mrkdwn string for Slack) is byte-identical +regardless of which side rendered it. + +Envelope shape: + + { + "summary": {"total": N, "succeeded": S, "failed": F}, + "events": [ + { + "type": "ETL" | "TASK" | "API", + "pipeline_name": "...", + "status": "ERROR" | "SUCCESS" | ..., + "execution_id": "...", + "timestamp": "2026 May 5 5:03:34 PM", + "additional_data": { + "total_files": int, + "successful_files": int, + "failed_files": int, + }, + "error_message": "...", # only on failure + }, + ... + ] + } + +`pipeline_id` is intentionally absent — neither channel surfaces it. +""" + +from __future__ import annotations + +import datetime +from typing import Any + +# Hard cap on events per dispatch; the rest roll into the next flush tick. +MAX_BATCH_SIZE = 500 +# Slack inlines this many events before collapsing the rest under an +# "_… and K more_" footer. Slack tolerates much larger payloads, but +# readability tanks past ~25 lines. +SLACK_MAX_DISPLAY_EVENTS = 25 + +_SUCCESS_STATUSES = frozenset({"COMPLETED", "SUCCESS"}) + +# Middle dot (U+00B7) padded by single spaces — the per-event field separator. +_SEPARATOR = " · " +_MISSING = "—" # em-dash placeholder for missing fields +_DIVIDER = "———" # triple em-dash divider between header and events + +# Slack emoji shortcodes — render the same as the literal unicode glyphs and +# stay readable in source. +_EMOJI_SUCCESS = ":white_check_mark:" +_EMOJI_FAILURE = ":x:" + + +def _is_success(status: str | None) -> bool: + if not status: + return False + return status.upper() in _SUCCESS_STATUSES + + +def _has_failed_files(counts: dict[str, Any]) -> bool: + """True when file-level aggregates show at least one failure.""" + failed = counts.get("failed_files") + return isinstance(failed, int) and failed > 0 + + +def _is_effective_success(status: str | None, counts: dict[str, Any]) -> bool: + """Treat a COMPLETED run with any file failures as a partial failure. + + Mirrors the failure-filter contract at dispatch time so renderer summary + counts and the per-event file-count emoji match the reason the alert + fired. + """ + return _is_success(status) and not _has_failed_files(counts) + + +def _humanize_timestamp(iso: str | None) -> str: + """Render an ISO timestamp as `2026 May 11 11:38:31 AM` (POSIX `%-d`). + + Falls back to the missing placeholder on falsy / unparseable input so a + partial row still renders without raising. + """ + if not iso: + return _MISSING + try: + dt = datetime.datetime.fromisoformat(iso) + except (TypeError, ValueError): + return _MISSING + return dt.strftime("%Y %b %-d %I:%M:%S %p") + + +def _format_file_count(event: dict[str, Any]) -> str: + """Render the file-count summary; empty string when no totals available. + + A COMPLETED run with file failures short-circuits to the failure shape so + the rendered line matches why a failures-only notification fired. + """ + counts = event.get("additional_data") or {} + total = counts.get("total_files") + if total is None: + return "" + if _has_failed_files(counts): + failed = counts.get("failed_files", 0) + return f"{_EMOJI_FAILURE} {failed}/{total} files" + if _is_success(event.get("status")): + successful = counts.get("successful_files", 0) + return f"{_EMOJI_SUCCESS} {successful}/{total} files" + failed = counts.get("failed_files", 0) + return f"{_EMOJI_FAILURE} {failed}/{total} files" + + +def _format_event_line(event: dict[str, Any]) -> str: + """Format one event as a single Slack mrkdwn line. + + Fields are middle-dot separated; the file-count column is omitted when + `additional_data` is empty so the line collapses to 5 fields, not 6. + """ + parts = [ + event.get("timestamp") or _MISSING, + f"*{event.get('execution_id') or _MISSING}*", + event.get("type") or _MISSING, + event.get("pipeline_name") or _MISSING, + event.get("status") or _MISSING, + ] + file_count = _format_file_count(event) + if file_count: + parts.append(file_count) + return _SEPARATOR.join(parts) + + +def _event_from_payload(payload: dict[str, Any]) -> dict[str, Any]: + """Project a buffered payload into the canonical per-event dict. + + Unified shape across Slack/API and every dispatch path. `pipeline_id` + is intentionally dropped — neither channel surfaces it. Timestamps are + humanized once at projection so Slack and API consumers see the same + string (implicit UTC, no timezone suffix). + """ + event: dict[str, Any] = { + "type": payload.get("type") or "", + "pipeline_name": payload.get("pipeline_name") or "", + "status": payload.get("status") or "", + "execution_id": payload.get("execution_id") or "", + "timestamp": _humanize_timestamp(payload.get("timestamp")), + "additional_data": payload.get("additional_data") or {}, + } + error_message = payload.get("error_message") + if error_message: + event["error_message"] = error_message + return event + + +def build_envelope(payloads: list[dict[str, Any]]) -> dict[str, Any]: + """Build the canonical envelope used by every dispatch path. + + Summary carries only `{total, succeeded, failed}` — one envelope shape + so receivers parse a single schema, not two. + """ + capped = payloads[:MAX_BATCH_SIZE] + succeeded = sum( + 1 + for p in capped + if _is_effective_success(p.get("status"), p.get("additional_data") or {}) + ) + failed = len(capped) - succeeded + return { + "summary": { + "total": len(capped), + "succeeded": succeeded, + "failed": failed, + }, + "events": [_event_from_payload(p) for p in capped], + } + + +def render_slack_text(envelope: dict[str, Any]) -> str: + """Render the envelope as Slack mrkdwn body text. + + Header + divider are emitted for every dispatch — single-event and + multi-event batches share the same shape. Visible events are capped at + ``SLACK_MAX_DISPLAY_EVENTS`` with an `_… and K more_` overflow footer. + """ + summary = envelope["summary"] + events: list[dict[str, Any]] = envelope["events"] + total = summary["total"] + noun = "execution" if total == 1 else "executions" + header = ( + f"*{total} {noun}* " + f"({_EMOJI_SUCCESS} {summary['succeeded']} succeeded " + f"{_EMOJI_FAILURE} {summary['failed']} failed)" + ) + visible = events[:SLACK_MAX_DISPLAY_EVENTS] + sections: list[str] = [header, _DIVIDER] + sections.extend(_format_event_line(e) for e in visible) + overflow = len(events) - len(visible) + if overflow > 0: + sections.append(_DIVIDER) + sections.append(f"_… and {overflow} more executions_") + return "\n".join(sections) diff --git a/workers/callback/tasks.py b/workers/callback/tasks.py index 42599f0659..eee5b97da3 100644 --- a/workers/callback/tasks.py +++ b/workers/callback/tasks.py @@ -381,12 +381,16 @@ def _update_execution_status_unified( try: # Consistent workflow execution status update across all callback types total_files = aggregated_results.get("total_files", 0) + successful_files = aggregated_results.get("successful_files", 0) + failed_files = aggregated_results.get("failed_files", 0) # Make the unified API call api_client.update_workflow_execution_status( execution_id=execution_id, status=final_status, total_files=total_files, + successful_files=successful_files, + failed_files=failed_files, organization_id=organization_id, error_message=error_message, ) diff --git a/workers/log_consumer/process_notification_buffer.py b/workers/log_consumer/process_notification_buffer.py new file mode 100755 index 0000000000..7f93a8dfe6 --- /dev/null +++ b/workers/log_consumer/process_notification_buffer.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +"""Trigger backend processing of the notification buffer. + +Mirrors process_log_history.py: a thin wrapper around an internal API call +that the log_consumer scheduler.sh fires on every tick. The backend owns the +actual GROUP BY / SKIP LOCKED / dispatch logic. Idempotent — safe to run +under multiple replicas (the backend's row-level lock prevents duplicate +dispatch). + +Usage: + python process_notification_buffer.py +""" + +import logging +import os +import sys + +import httpx + +logger = logging.getLogger(__name__) + +# Endpoint must match the URL registered in +# backend/notification_v2/internal_urls.py + backend/backend/internal_base_urls.py +PROCESS_BUFFER_ENDPOINT = "v1/webhook/buffer/process/" + + +def process_notification_buffer() -> bool: + """Hit the backend's process-buffer endpoint; return True on success. + + Returns False on auth/network failure so the calling scheduler can log + the failure and proceed to the next tick. Never raises — the scheduler + is supposed to keep ticking. + """ + internal_api_base_url = os.getenv("INTERNAL_API_BASE_URL") + internal_api_key = os.getenv("INTERNAL_SERVICE_API_KEY") + + if not internal_api_base_url: + logger.error("INTERNAL_API_BASE_URL environment variable not set") + return False + if not internal_api_key: + logger.error("INTERNAL_SERVICE_API_KEY environment variable not set") + return False + + url = f"{internal_api_base_url.rstrip('/')}/{PROCESS_BUFFER_ENDPOINT}" + # Longer timeout than process_log_history (60s vs 30s): a flush tick can + # involve multiple Celery dispatches, GC, and per-group rendering. + transport = httpx.HTTPTransport(retries=3) + try: + with httpx.Client(transport=transport) as client: + response = client.post( + url, + headers={"Authorization": f"Bearer {internal_api_key}"}, + timeout=60.0, + ) + except httpx.HTTPError as e: + logger.error("HTTP error calling process-buffer: %s", e) + return False + except Exception: + logger.exception("Unexpected error calling process-buffer") + return False + + if response.status_code != 200: + logger.error( + "Backend returned status %s on process-buffer: %s", + response.status_code, + response.text[:500], + ) + return False + + result = response.json() + if result.get("dispatched_groups", 0) > 0 or result.get("gc_deleted_rows", 0) > 0: + logger.info( + "Notification buffer flush: groups=%s rows=%s gc=%s", + result.get("dispatched_groups", 0), + result.get("dispatched_rows", 0), + result.get("gc_deleted_rows", 0), + ) + return True + + +if __name__ == "__main__": + success = process_notification_buffer() + sys.exit(0 if success else 1) diff --git a/workers/log_consumer/scheduler.sh b/workers/log_consumer/scheduler.sh index a5572b21eb..0bf7dca96d 100755 --- a/workers/log_consumer/scheduler.sh +++ b/workers/log_consumer/scheduler.sh @@ -1,19 +1,26 @@ #!/usr/bin/env bash -set -euo pipefail +set -uo pipefail +# Note: pipefail without -e — one task's failure must not abort the loop. INTERVAL="${LOG_HISTORY_CONSUMER_INTERVAL:-5}" -TASK_NAME="process_log_history" -# Task trigger command - can be overridden via environment variable -DEFAULT_TRIGGER_CMD="/app/.venv/bin/python /app/log_consumer/process_log_history.py" -TRIGGER_CMD="${TASK_TRIGGER_COMMAND:-$DEFAULT_TRIGGER_CMD}" +# Task 1: log history consumer (existing). +DEFAULT_LOG_HISTORY_CMD="/app/.venv/bin/python /app/log_consumer/process_log_history.py" +LOG_HISTORY_CMD="${TASK_TRIGGER_COMMAND:-$DEFAULT_LOG_HISTORY_CMD}" + +# Task 2: notification buffer flush (UNS-611 clubbed dispatch). +# The endpoint short-circuits on an empty PENDING set, so polling on the same +# 5s tick is cheap. Real dispatch cadence is gated by NOTIFICATION_CLUB_INTERVAL +# on the backend (rows precompute flush_after at enqueue time). +DEFAULT_BUFFER_FLUSH_CMD="/app/.venv/bin/python /app/log_consumer/process_notification_buffer.py" +BUFFER_FLUSH_CMD="${NOTIFICATION_BUFFER_TASK_COMMAND:-$DEFAULT_BUFFER_FLUSH_CMD}" echo "==========================================" -echo "Log History Scheduler Starting" +echo "Log Consumer Scheduler Starting" echo "==========================================" -echo "Task: ${TASK_NAME}" echo "Interval: ${INTERVAL} seconds" -echo "Trigger Command: ${TRIGGER_CMD}" +echo "Task 1 (log history): ${LOG_HISTORY_CMD}" +echo "Task 2 (notification buffer flush): ${BUFFER_FLUSH_CMD}" echo "==========================================" cleanup() { @@ -27,20 +34,27 @@ cleanup() { trap cleanup SIGTERM SIGINT +run_task() { + # $1 = display name, $2 = command. Returns the command's exit code but + # never propagates failure — the caller logs it and moves on. + local task_name="$1" + local cmd="$2" + echo "[$(date '+%Y-%m-%d %H:%M:%S')] [Run #${run_count}] Triggering ${task_name}..." + if eval "${cmd}" 2>&1; then + echo "[$(date '+%Y-%m-%d %H:%M:%S')] [Run #${run_count}] ✓ ${task_name} OK" + else + local exit_code=$? + echo "[$(date '+%Y-%m-%d %H:%M:%S')] [Run #${run_count}] ✗ ${task_name} failed with exit code ${exit_code}" + fi +} + run_count=0 while true; do run_count=$((run_count + 1)) - echo "[$(date '+%Y-%m-%d %H:%M:%S')] [Run #${run_count}] Triggering ${TASK_NAME}..." - - if eval "${TRIGGER_CMD}" 2>&1; then - echo "[$(date '+%Y-%m-%d %H:%M:%S')] [Run #${run_count}] ✓ Task completed successfully" - else - exit_code=$? - echo "[$(date '+%Y-%m-%d %H:%M:%S')] [Run #${run_count}] ✗ Task failed with exit code ${exit_code}" - echo "[$(date '+%Y-%m-%d %H:%M:%S')] [Run #${run_count}] Will retry after ${INTERVAL} seconds" - fi + run_task "process_log_history" "${LOG_HISTORY_CMD}" + run_task "process_notification_buffer" "${BUFFER_FLUSH_CMD}" echo "[$(date '+%Y-%m-%d %H:%M:%S')] Sleeping for ${INTERVAL} seconds..." echo "" diff --git a/workers/notification/providers/api_webhook.py b/workers/notification/providers/api_webhook.py index 8c54d2d19d..c6961e7227 100644 --- a/workers/notification/providers/api_webhook.py +++ b/workers/notification/providers/api_webhook.py @@ -1,6 +1,9 @@ """API Webhook Notification Provider -Standard API webhook provider for generic webhook endpoints. +Wraps worker-callback payloads (flat per-event dict) in the canonical +envelope so API webhook receivers always see the same +``{"summary": {...}, "events": [...]}`` shape. Backend dispatches already +arrive in envelope form and pass through. """ from typing import Any @@ -8,17 +11,20 @@ from notification.providers.webhook_provider import WebhookProvider from shared.infrastructure.logging import WorkerLogger +from unstract.core.notification_clubbed_renderer import build_envelope + logger = WorkerLogger.get_logger(__name__) class APIWebhook(WebhookProvider): """Standard API webhook provider. - Handles generic webhook notifications without platform-specific formatting. - Sends the payload as-is in JSON format. + Normalises the payload to the canonical envelope before POSTing so + programmatic consumers parse one schema regardless of how the + notification was produced. """ - def __init__(self): + def __init__(self) -> None: """Initialize API webhook provider.""" super().__init__() self.provider_name = "APIWebhook" @@ -26,16 +32,15 @@ def __init__(self): def prepare_data(self, notification_data: dict[str, Any]) -> dict[str, Any]: """Prepare API webhook data. - For standard API webhooks, we send the payload as-is without - any special formatting. + Wraps a flat per-event payload in the canonical envelope; payloads + already in envelope shape (backend-built) pass through. + """ + prepared_data = super().prepare_data(notification_data) - Args: - notification_data: Raw notification data + if "payload" in prepared_data: + payload = prepared_data["payload"] + if isinstance(payload, dict) and "events" not in payload: + prepared_data["payload"] = build_envelope(payloads=[payload]) - Returns: - Prepared notification data - """ - logger.debug( - f"Preparing standard API webhook data for {notification_data.get('url')}" - ) - return super().prepare_data(notification_data) + logger.debug(f"Prepared API webhook data for {notification_data.get('url')}") + return prepared_data diff --git a/workers/notification/providers/slack_webhook.py b/workers/notification/providers/slack_webhook.py index 89206646a5..1028aef289 100644 --- a/workers/notification/providers/slack_webhook.py +++ b/workers/notification/providers/slack_webhook.py @@ -1,7 +1,8 @@ """Slack Webhook Notification Provider -This provider handles Slack-specific webhook notifications with proper -payload formatting for Slack's Block Kit API. +Renders worker-callback payloads (flat per-event dict) into the same +single-line Slack body the backend produces via ``clubbed_renderer``. +Backend-rendered payloads (`{"text": ""}`) pass through unchanged. """ from typing import Any @@ -9,17 +10,23 @@ from notification.providers.webhook_provider import WebhookProvider from shared.infrastructure.logging import WorkerLogger +from unstract.core.notification_clubbed_renderer import ( + build_envelope, + render_slack_text, +) + logger = WorkerLogger.get_logger(__name__) class SlackWebhook(WebhookProvider): """Slack-specific webhook provider. - Formats payloads according to Slack's expected structure, - including support for Block Kit formatting. + Renders flat per-event payloads via the worker-side mirror of the + backend clubbed renderer, then sends them as Slack-native ``text`` + mrkdwn. """ - def __init__(self): + def __init__(self) -> None: """Initialize Slack webhook provider.""" super().__init__() self.provider_name = "SlackWebhook" @@ -27,9 +34,6 @@ def __init__(self): def prepare_data(self, notification_data: dict[str, Any]) -> dict[str, Any]: """Prepare Slack-specific webhook data. - Formats the payload to match Slack's expected structure - with 'text' field and optional Block Kit blocks. - Args: notification_data: Raw notification data @@ -38,7 +42,6 @@ def prepare_data(self, notification_data: dict[str, Any]) -> dict[str, Any]: """ prepared_data = super().prepare_data(notification_data) - # Format payload for Slack if "payload" in prepared_data: prepared_data["payload"] = self.format_payload(prepared_data["payload"]) @@ -47,171 +50,18 @@ def prepare_data(self, notification_data: dict[str, Any]) -> dict[str, Any]: def format_payload(self, payload: dict[str, Any]) -> dict[str, Any]: """Format the payload to match Slack's expected structure. - Args: - payload: Original payload - - Returns: - Slack-formatted payload with 'text' field and optional blocks - """ - # If payload already has 'text' field, enhance it with blocks - if "text" in payload: - formatted_payload = { - "text": payload.pop("text"), - "blocks": self.create_blocks_from_payload(payload), - } - else: - # Construct a Slack message from the payload - formatted_payload = { - "text": self._get_summary_text(payload), - "blocks": self.create_blocks_from_payload(payload), - } - - return formatted_payload - - def create_blocks_from_payload(self, payload: dict[str, Any]) -> list[dict[str, Any]]: - """Create Slack Block Kit blocks from the payload. - - Args: - payload: Payload to convert to blocks - - Returns: - List of Slack Block Kit blocks - """ - blocks = [] - - # Header block - blocks.append( - { - "type": "section", - "text": {"type": "mrkdwn", "text": "*Unstract Notification*"}, - } - ) - - # Add divider for visual separation - blocks.append({"type": "divider"}) - - # Add each key-value pair as a section - for key, value in payload.items(): - if value is None or value == "": - continue - - # Format key for display - formatted_key = self._format_key(key) - - # Format value based on type - formatted_value = self._format_value(value) - - # Create section block with inline format - blocks.append( - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": f"*{formatted_key}:* {formatted_value}", - }, - } - ) - - # Add timestamp footer if not already present - if not any("timestamp" in str(block).lower() for block in blocks): - from datetime import datetime - - blocks.append({"type": "divider"}) - blocks.append( - { - "type": "context", - "elements": [ - { - "type": "mrkdwn", - "text": f"_Sent at {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}_", - } - ], - } - ) - - return blocks - - def _get_summary_text(self, payload: dict[str, Any]) -> str: - """Generate summary text from payload. - - Args: - payload: Payload to summarize - - Returns: - Summary text for Slack notification - """ - # Priority order for summary fields - summary_fields = [ - "message", - "status", - "pipeline_name", - "workflow_name", - "api_name", - "error", - "result", - "summary", - ] - - for field in summary_fields: - if field in payload and payload[field]: - return str(payload[field]) - - # Default summary - return "Unstract Notification" - - def _format_key(self, key: str) -> str: - """Format dictionary key for display. - - Args: - key: Raw key name - - Returns: - Formatted key for display + Two input shapes are accepted: + - Backend-rendered ``{"text": ""}`` (any backend dispatch + through ``clubbed_renderer``) — passed through. + - Flat per-event dict from a worker callback — wrapped in a + single-event envelope and rendered to the canonical single-line + mrkdwn body. """ - # Replace underscores with spaces and capitalize - formatted = key.replace("_", " ").title() - - # Special formatting for known keys - key_mapping = { - "Pipeline Name": "Pipeline Name", - "Api Name": "API Name", - "Workflow Name": "Workflow Name", - "Status": "Status", - "Error": "Error", - "Success": "Success", - "Execution Id": "Execution Id", - "Organization Id": "Organization Id", - } - - return key_mapping.get(formatted, formatted) - - def _format_value(self, value: Any) -> str: - """Format value for Slack display. - - Args: - value: Value to format + if "text" in payload and "events" not in payload: + return {"text": payload["text"]} - Returns: - Formatted value string - """ - if isinstance(value, bool): - return "✅ Yes" if value else "❌ No" - elif isinstance(value, (list, tuple)): - return "\n• " + "\n• ".join(str(item) for item in value) - elif isinstance(value, dict): - # Format nested dictionary - items = [] - for k, v in value.items(): - items.append(f" • {self._format_key(k)}: {v}") - return "\n" + "\n".join(items) - elif value is None: - return "_Not specified_" - else: - # Format long strings - value_str = str(value) - if len(value_str) > 500: - return value_str[:497] + "..." - return value_str + envelope = build_envelope(payloads=[payload]) + return {"text": render_slack_text(envelope)} def get_destination(self, notification_data: dict[str, Any]) -> str: """Extract webhook URL from notification data with masking for security.""" @@ -230,4 +80,4 @@ def get_destination(self, notification_data: dict[str, Any]) -> str: # Mask long URLs that might contain tokens return url[:30] + "..." + url[-10:] - return url + return str(url) diff --git a/workers/scheduler/tasks.py b/workers/scheduler/tasks.py index 2e22946a58..65e5bd7af1 100644 --- a/workers/scheduler/tasks.py +++ b/workers/scheduler/tasks.py @@ -81,6 +81,7 @@ def _send_pipeline_status_notification( pipeline_id=pipeline_id, pipeline_name=pipeline_name, notification_payload=notification, + execution_id=execution_id, ) logger.info(f"Notification sent successfully for {pipeline_type} {pipeline_id}") except Exception as notification_error: diff --git a/workers/shared/api/internal_client.py b/workers/shared/api/internal_client.py index def90bd86a..4964a2b377 100644 --- a/workers/shared/api/internal_client.py +++ b/workers/shared/api/internal_client.py @@ -603,6 +603,8 @@ def update_workflow_execution_status( status: str, error_message: str | None = None, total_files: int | None = None, + successful_files: int | None = None, + failed_files: int | None = None, attempts: int | None = None, execution_time: float | None = None, organization_id: str | None = None, @@ -613,6 +615,8 @@ def update_workflow_execution_status( status, error_message, total_files, + successful_files, + failed_files, attempts, execution_time, organization_id, diff --git a/workers/shared/clients/execution_client.py b/workers/shared/clients/execution_client.py index 80e9ca6568..e1373eb9f5 100644 --- a/workers/shared/clients/execution_client.py +++ b/workers/shared/clients/execution_client.py @@ -265,6 +265,8 @@ def update_workflow_execution_status( status: str | TaskStatus, error_message: str | None = None, total_files: int | None = None, + successful_files: int | None = None, + failed_files: int | None = None, attempts: int | None = None, execution_time: float | None = None, organization_id: str | None = None, @@ -276,6 +278,8 @@ def update_workflow_execution_status( status: New status (TaskStatus enum or string) error_message: Optional error message total_files: Optional total files count + successful_files: Optional count of files that completed successfully + failed_files: Optional count of files that errored attempts: Optional attempts count execution_time: Optional execution time organization_id: Optional organization ID override @@ -292,6 +296,10 @@ def update_workflow_execution_status( data["error_message"] = error_message if total_files is not None: data["total_files"] = total_files + if successful_files is not None: + data["successful_files"] = successful_files + if failed_files is not None: + data["failed_files"] = failed_files if attempts is not None: data["attempts"] = attempts if execution_time is not None: diff --git a/workers/shared/patterns/notification/helper.py b/workers/shared/patterns/notification/helper.py index 977f5a875f..ed00475ea0 100644 --- a/workers/shared/patterns/notification/helper.py +++ b/workers/shared/patterns/notification/helper.py @@ -5,8 +5,7 @@ """ import logging - -from celery import current_app +from typing import Any # Import shared data models from @unstract/core from unstract.core.data_models import ( @@ -18,85 +17,92 @@ logger = logging.getLogger(__name__) - -def get_webhook_headers( - auth_type: str, auth_key: str | None, auth_header: str | None -) -> dict[str, str]: - """Generate webhook headers based on authorization configuration.""" - headers = {"Content-Type": "application/json"} - - try: - if auth_type and auth_key: - auth_type_upper = auth_type.upper() - - if auth_type_upper == "BEARER": - headers["Authorization"] = f"Bearer {auth_key}" - elif auth_type_upper == "API_KEY": - headers["Authorization"] = auth_key - elif auth_type_upper == "CUSTOM_HEADER" and auth_header: - headers[auth_header] = auth_key - # NONE type just uses Content-Type header - except Exception as e: - logger.warning(f"Error generating webhook headers: {e}") - # Use default headers if auth config is invalid - - return headers +ENQUEUE_BUFFER_ENDPOINT = "v1/webhook/buffer/enqueue/" -def send_notification_to_worker( - url: str, +def _enqueue_to_buffer( + api_client: Any, + notification: dict[str, Any], payload: NotificationPayload, - auth_type: str, - auth_key: str | None, - auth_header: str | None, - max_retries: int = 0, - platform: str | None = None, -) -> bool: - """Send a single notification to the notification worker queue. +) -> None: + """POST a single execution event to the backend's buffer endpoint. - Args: - url: Webhook URL to send notification to - payload: Structured notification payload - auth_type: Authorization type (NONE, BEARER, API_KEY, CUSTOM_HEADER) - auth_key: Authorization key/token - auth_header: Custom header name for CUSTOM_HEADER auth type - max_retries: Maximum number of retry attempts - platform: Platform type from notification config (SLACK, API, etc.) - - Returns: - True if task was successfully queued, False otherwise + Worker writes nothing to the DB itself — the backend owns NotificationBuffer + rows. Raises on any failure so the outer trigger_* caller's except block + logs the drop instead of silently treating BATCHED delivery as successful. """ + # Forward the full per-event shape so the backend renderer can match + # the canonical KV layout per event (Type / Pipeline Id / Pipeline Name / + # Status / Execution Id / Timestamp / Additional Data). Older backend + # builds that ignore the extra fields stay unaffected. + payload_type = payload.type.value if hasattr(payload.type, "value") else payload.type + payload_status = ( + payload.status.value if hasattr(payload.status, "value") else payload.status + ) + payload_timestamp = payload.timestamp.isoformat() if payload.timestamp else None try: - headers = get_webhook_headers(auth_type, auth_key, auth_header) - - # Convert payload to webhook format (excludes internal fields) - payload_dict = payload.to_webhook_payload() - - # Send task to notification worker - current_app.send_task( - "send_webhook_notification", - args=[ - url, - payload_dict, - headers, - 10, # timeout - ], - kwargs={ - "max_retries": max_retries, - "retry_delay": 10, - "platform": platform, + api_client._make_request( + method="POST", + endpoint=ENQUEUE_BUFFER_ENDPOINT, + data={ + "notification_id": notification["id"], + "type": payload_type, + "execution_id": payload.execution_id, + "pipeline_id": payload.pipeline_id, + "pipeline_name": payload.pipeline_name, + "status": payload_status, + "error_message": payload.error_message, + "platform": notification.get("platform"), + "timestamp": payload_timestamp, + "additional_data": payload.additional_data or {}, }, - queue="notifications", + timeout=10, + ) + # Propagate any failure; caller decides whether to continue iteration. + except Exception: # noqa: BLE001 + logger.exception( + "Failed to enqueue BATCHED notification %s for pipeline %s", + notification["id"], + payload.pipeline_id, ) + raise + logger.info( + "Enqueued BATCHED notification %s for pipeline %s execution %s", + notification["id"], + payload.pipeline_id, + payload.execution_id, + ) + + +def _route_notification( + api_client: Any, + notification: dict[str, Any], + payload: NotificationPayload, +) -> None: + """Forward webhook notifications to the backend buffer-enqueue endpoint. - logger.info( - f"Sent webhook notification to worker queue for {url} (pipeline: {payload.pipeline_id})" + Single dispatch path: the backend owns the buffer and the periodic + flush ships clubbed messages. Non-webhook notification types are + skipped at this layer. An enqueue failure is logged but doesn't abort + the outer trigger_* loop so sibling notifications still get their + chance. + """ + if notification.get("notification_type") != "WEBHOOK": + logger.debug( + "Skipping non-webhook notification type: %s", + notification.get("notification_type"), ) - return True + return - except Exception as e: - logger.error(f"Failed to send notification to {url}: {e}") - return False + try: + _enqueue_to_buffer(api_client, notification, payload) + # Already logged with stack inside _enqueue_to_buffer; broad catch keeps + # sibling notifications going. + except Exception: # noqa: BLE001 + logger.warning( + "Buffer enqueue failed for notification %s; continuing with others", + notification.get("id"), + ) def trigger_notification( @@ -104,6 +110,7 @@ def trigger_notification( pipeline_id: str, pipeline_name: str, notification_payload: NotificationPayload, + execution_id: str | None = None, ) -> None: """Trigger notifications for pipeline status updates. @@ -111,10 +118,13 @@ def trigger_notification( Uses API client to fetch notification configuration. """ try: - # Fetch pipeline notifications via API + # Pass execution_id so the backend filter respects notify_on_failures + # (see trigger_pipeline_notifications for the rationale). + params = {"execution_id": execution_id} if execution_id else None response_data = api_client._make_request( method="GET", endpoint=f"v1/webhook/pipeline/{pipeline_id}/notifications/", + params=params, timeout=10, ) @@ -135,20 +145,7 @@ def trigger_notification( # Send each notification for notification in active_notifications: - if notification.get("notification_type") == "WEBHOOK": - send_notification_to_worker( - url=notification["url"], - payload=notification_payload, - auth_type=notification.get("authorization_type", "NONE"), - auth_key=notification.get("authorization_key"), - auth_header=notification.get("authorization_header"), - max_retries=notification.get("max_retries", 0), - platform=notification.get("platform"), - ) - else: - logger.debug( - f"Skipping non-webhook notification type: {notification.get('notification_type')}" - ) + _route_notification(api_client, notification, notification_payload) except Exception as e: logger.error(f"Error triggering pipeline notifications for {pipeline_id}: {e}") @@ -176,10 +173,14 @@ def trigger_pipeline_notifications( return try: - # Fetch pipeline notifications via API + # Pass execution_id so the backend can drop notify_on_failures=True rows + # on success runs. Without it the endpoint is a no-op and we'd fire on + # every active row regardless of trigger preference. + params = {"execution_id": execution_id} if execution_id else None response_data = api_client._make_request( method="GET", endpoint=f"v1/webhook/pipeline/{pipeline_id}/notifications/", + params=params, timeout=10, ) @@ -204,7 +205,9 @@ def trigger_pipeline_notifications( else: workflow_type = WorkflowType.ETL # Default fallback - # Create notification payload using dataclass + # File counts come from WorkflowExecution via the same endpoint so + # webhook receivers (Slack, raw API) see partial-success breakdowns. + counts = response_data.get("execution_counts") or {} payload = NotificationPayload.from_execution_status( pipeline_id=pipeline_id, pipeline_name=pipeline_name, @@ -213,6 +216,9 @@ def trigger_pipeline_notifications( source=NotificationSource.CALLBACK_WORKER, execution_id=execution_id, error_message=error_message, + total_files=counts.get("total_files", 0), + successful_files=counts.get("successful_files", 0), + failed_files=counts.get("failed_files", 0), ) logger.info( @@ -221,20 +227,7 @@ def trigger_pipeline_notifications( # Send each notification for notification in active_notifications: - if notification.get("notification_type") == "WEBHOOK": - send_notification_to_worker( - url=notification["url"], - payload=payload, - auth_type=notification.get("authorization_type", "NONE"), - auth_key=notification.get("authorization_key"), - auth_header=notification.get("authorization_header"), - max_retries=notification.get("max_retries", 0), - platform=notification.get("platform"), - ) - else: - logger.debug( - f"Skipping non-webhook notification type: {notification.get('notification_type')}" - ) + _route_notification(api_client, notification, payload) except Exception as e: logger.error(f"Error triggering pipeline notifications for {pipeline_id}: {e}") @@ -261,9 +254,14 @@ def trigger_api_notifications( return try: - # Fetch API notifications via API + # See trigger_pipeline_notifications: execution_id powers the backend + # filter that respects notify_on_failures. + params = {"execution_id": execution_id} if execution_id else None response_data = api_client._make_request( - method="GET", endpoint=f"v1/webhook/api/{api_id}/notifications/", timeout=10 + method="GET", + endpoint=f"v1/webhook/api/{api_id}/notifications/", + params=params, + timeout=10, ) # _make_request already handles status codes and returns parsed data @@ -277,7 +275,7 @@ def trigger_api_notifications( logger.info(f"No active notifications found for API {api_id}") return - # Create notification payload using dataclass + counts = response_data.get("execution_counts") or {} payload = NotificationPayload.from_execution_status( pipeline_id=api_id, pipeline_name=api_name, @@ -286,6 +284,9 @@ def trigger_api_notifications( source=NotificationSource.CALLBACK_WORKER, execution_id=execution_id, error_message=error_message, + total_files=counts.get("total_files", 0), + successful_files=counts.get("successful_files", 0), + failed_files=counts.get("failed_files", 0), ) logger.info( @@ -294,27 +295,14 @@ def trigger_api_notifications( # Send each notification for notification in active_notifications: - if notification.get("notification_type") == "WEBHOOK": - send_notification_to_worker( - url=notification["url"], - payload=payload, - auth_type=notification.get("authorization_type", "NONE"), - auth_key=notification.get("authorization_key"), - auth_header=notification.get("authorization_header"), - max_retries=notification.get("max_retries", 0), - platform=notification.get("platform"), - ) - else: - logger.debug( - f"Skipping non-webhook notification type: {notification.get('notification_type')}" - ) + _route_notification(api_client, notification, payload) except Exception as e: logger.error(f"Error triggering API notifications for {api_id}: {e}") def handle_status_notifications( - api_client, + api_client: Any, pipeline_id: str, status: str, execution_id: str | None = None,