Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
60d4816
api deployment notification init
kirtimanmishrazipstack Apr 24, 2026
207277e
UN-3431 [FIX] Stream tool-run logs to workflow execution UI with mark…
chandrasekharan-zipstack Apr 24, 2026
29bdf64
UN-3430 [FIX] Update modified_at field correctly for models (#1928)
chandrasekharan-zipstack Apr 24, 2026
fd532be
UN-3403 [FEAT] Agentic table extractor plugin with multi-agent LLM-po…
harini-venkataraman Apr 28, 2026
5d591dc
UN-3358 [FIX] Drop cross-region S3 buckets from connector listing (#1…
kirtimanmishrazipstack Apr 28, 2026
6ad5fa9
Merge branch 'main' of github.com:Zipstack/unstract into UN-3056-Noti…
kirtimanmishrazipstack Apr 29, 2026
019f33c
payload metadata in api deployment
kirtimanmishrazipstack May 5, 2026
9cd8eb1
slack webhook payload
kirtimanmishrazipstack May 5, 2026
968fdbd
Merge branch 'main' of github.com:Zipstack/unstract into UN-3056-Noti…
kirtimanmishrazipstack May 7, 2026
aad0fa9
Uns 611 clubbed notification dispatch (#1951)
kirtimanmishrazipstack May 7, 2026
52e8abf
Merge branch 'UN-3056-Notify-on-API-deployment-failures' of github.co…
kirtimanmishrazipstack May 7, 2026
7d455c1
Merge branch 'main' of github.com:Zipstack/unstract into UN-3056-Noti…
kirtimanmishrazipstack May 7, 2026
0a5b5bb
Merge branch 'main' of github.com:Zipstack/unstract into UN-3056-Noti…
kirtimanmishrazipstack May 8, 2026
d98f580
Merge branch 'main' into UN-3056-Notify-on-API-deployment-failures
kirtimanmishrazipstack May 12, 2026
b8bf719
Uns 611 clubbed notification dispatch (#1959)
kirtimanmishrazipstack May 12, 2026
37930d2
notification API
kirtimanmishrazipstack May 12, 2026
f8052db
delivery mode batch by default
kirtimanmishrazipstack May 13, 2026
8798737
UI change
kirtimanmishrazipstack May 13, 2026
33d77e9
Merge branch 'main' into UN-3056-Notify-on-API-deployment-failures
kirtimanmishrazipstack May 13, 2026
50917e8
PR reviews
kirtimanmishrazipstack May 13, 2026
f75a4ec
Merge branch 'main' of github.com:Zipstack/unstract into UN-3056-Noti…
kirtimanmishrazipstack May 13, 2026
92ad063
sonar issues
kirtimanmishrazipstack May 13, 2026
e6a87e4
sonar issues
kirtimanmishrazipstack May 13, 2026
a736bf8
code rabbit refactor
kirtimanmishrazipstack May 13, 2026
e653494
greptile comments resolve
kirtimanmishrazipstack May 13, 2026
724d280
UN-3056 Scope enqueue execution_id exemption to INPROGRESS
kirtimanmishrazipstack May 13, 2026
4104faf
greptile comments resolve
kirtimanmishrazipstack May 13, 2026
167d60f
UN-3056 Skip deactivated notifications in BATCHED flush
kirtimanmishrazipstack May 13, 2026
77b7956
greptile comments resolve
kirtimanmishrazipstack May 13, 2026
780d51c
remove immediate mode
kirtimanmishrazipstack May 13, 2026
743e9a9
add legacy code
kirtimanmishrazipstack May 13, 2026
ba04d58
add legacy code
kirtimanmishrazipstack May 13, 2026
b1fd243
greptile review
kirtimanmishrazipstack May 13, 2026
51910ab
Merge branch 'main' into UN-3056-Notify-on-API-deployment-failures
kirtimanmishrazipstack May 14, 2026
fb47c4f
greptile review
kirtimanmishrazipstack May 14, 2026
99d3653
Merge branch 'UN-3056-Notify-on-API-deployment-failures' of github.co…
kirtimanmishrazipstack May 14, 2026
1c10646
Merge branch 'main' into UN-3056-Notify-on-API-deployment-failures
kirtimanmishrazipstack May 20, 2026
c113aca
Merge branch 'main' of github.com:Zipstack/unstract into UN-3056-Noti…
kirtimanmishrazipstack May 22, 2026
b8699f4
UI as per new designs
kirtimanmishrazipstack May 22, 2026
5defa6e
Merge branch 'main' of github.com:Zipstack/unstract into UN-3056-Noti…
kirtimanmishrazipstack May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions backend/api_v2/notification.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from notification_v2.helper import NotificationHelper
from notification_v2.enums import FAILURE_STATUSES
from notification_v2.helper import dispatch_with_delivery_mode
from notification_v2.models import Notification
from pipeline_v2.dto import PipelineStatusPayload
from workflow_manager.workflow_v2.models.execution import WorkflowExecution
Expand All @@ -16,11 +17,33 @@ def __init__(self, api: APIDeployment, workflow_execution: WorkflowExecution) ->
self.api = api
self.workflow_execution = workflow_execution

def send(self):
if not self.notifications.count():
logger.info(f"No notifications found for api {self.api}")
def send(self) -> None:
# Failure if the run hit a non-success terminal state OR any file errored.
# Partial-success runs land as status=COMPLETED with failed_files>0, so the
# status check alone misses them — see callback aggregation rules.
failed_files = self.workflow_execution.failed_files or 0
is_failure = (
self.workflow_execution.status in FAILURE_STATUSES or failed_files > 0
)
if not is_failure:
# Success path: skip rows that opted into failure-only alerts.
self.notifications = self.notifications.filter(notify_on_failures=False)

if not self.notifications.exists():
logger.info(
"No notifications to dispatch for api %s (status=%s, failed_files=%s)",
self.api,
self.workflow_execution.status,
failed_files,
)
return
logger.info(f"Sending api status notification for api {self.api}")
logger.info(
"Sending api status notification for api %s (status=%s, successful=%s, failed=%s)",
self.api,
self.workflow_execution.status,
self.workflow_execution.successful_files or 0,
failed_files,
)

payload_dto = PipelineStatusPayload(
type="API",
Expand All @@ -29,8 +52,13 @@ def send(self):
status=self.workflow_execution.status,
execution_id=self.workflow_execution.id,
error_message=self.workflow_execution.error_message,
total_files=self.workflow_execution.total_files,
successful_files=self.workflow_execution.successful_files,
failed_files=failed_files,
)

NotificationHelper.send_notification(
notifications=self.notifications, payload=payload_dto.to_dict()
dispatch_with_delivery_mode(
list(self.notifications),
payload_dto.to_dict(),
error_context=f"api={self.api.id}",
)
9 changes: 9 additions & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str |

INDEXING_FLAG_TTL = int(get_required_setting("INDEXING_FLAG_TTL"))
NOTIFICATION_TIMEOUT = int(get_required_setting("NOTIFICATION_TIMEOUT", "5"))
# Window for clubbing BATCHED notifications — also the flush cadence (seconds).
# Default 300 (5 min). Per-notification buffer rows precompute flush_after at
# enqueue time, so changing this only affects rows enqueued after the restart.
NOTIFICATION_CLUB_INTERVAL = int(os.environ.get("NOTIFICATION_CLUB_INTERVAL", "300"))
# Retention for terminal NotificationBuffer rows (DISPATCHED / DEAD_LETTER).
# PENDING rows are never GC'd regardless of age.
NOTIFICATION_BUFFER_RETENTION_DAYS = int(
os.environ.get("NOTIFICATION_BUFFER_RETENTION_DAYS", "7")
)
ATOMIC_REQUESTS = CommonUtils.str_to_bool(
os.environ.get("DJANGO_ATOMIC_REQUESTS", "False")
)
Expand Down
8 changes: 8 additions & 0 deletions backend/configuration/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ class ConfigKey(Enum):
max_value=settings.MAX_PARALLEL_FILE_BATCHES_MAX_VALUE,
)

NOTIFICATION_CLUB_INTERVAL = ConfigSpec(
default=settings.NOTIFICATION_CLUB_INTERVAL,
value_type=ConfigType.INT,
help_text="Window (seconds) for clubbing BATCHED notifications.",
min_value=60,
max_value=7200,
)

def cast_value(self, raw_value: Any):
converters = {
ConfigType.INT: int,
Expand Down
52 changes: 52 additions & 0 deletions backend/notification_v2/clubbed_renderer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Backend dispatch entry for clubbed-notification rendering.

Delegates the canonical envelope + Slack body to
``unstract.core.notification_clubbed_renderer`` so backend and worker
callbacks emit byte-identical receiver-visible payloads. This thin shim
keeps the ``render_clubbed_message`` platform dispatcher (uses
``PlatformType`` enum) backend-side; everything else lives in the shared
module.
"""

from __future__ import annotations

import logging
from typing import Any

from notification_v2.enums import PlatformType
from unstract.core.notification_clubbed_renderer import (
build_envelope,
render_slack_text,
)

logger = logging.getLogger(__name__)

__all__ = ["build_envelope", "render_clubbed_message"]


def _render_for_slack(envelope: dict[str, Any]) -> dict[str, Any]:
"""Wrap the rendered Slack mrkdwn body in the dict shape Slack expects."""
return {"text": render_slack_text(envelope)}


def render_clubbed_message(
payloads: list[dict[str, Any]],
platform: str,
) -> dict[str, Any]:
"""Top-level entry — returns the dispatch body for ``platform``.

Used by every dispatch site so the receiver-visible payload is
identical regardless of caller.
"""
envelope = build_envelope(payloads)
if platform == PlatformType.SLACK.value:
return _render_for_slack(envelope)
if platform == PlatformType.API.value:
return envelope
# Unknown platform — fall back to the raw envelope and warn so misrouted
# rows don't drop silently.
logger.warning(
"Unknown platform %s for clubbed dispatch; returning raw envelope",
platform,
)
return envelope
45 changes: 45 additions & 0 deletions backend/notification_v2/enums.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
from enum import Enum

from workflow_manager.workflow_v2.enums import ExecutionStatus

# Single source of truth for "did this run fail for notification routing?".
# STOPPED is intentionally a failure here per migrations/0002_…notify_on_failures
# db_comment ("terminal status ERROR/STOPPED or any file in the run errored").
FAILURE_STATUSES = frozenset({ExecutionStatus.ERROR.value, ExecutionStatus.STOPPED.value})


class NotificationType(Enum):
WEBHOOK = "WEBHOOK"
Expand Down Expand Up @@ -36,3 +43,41 @@ class PlatformType(Enum):
@classmethod
def choices(cls):
return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls]


class DeliveryMode(Enum):
"""Per-notification dispatch mode.

Product ships every notification as BATCHED — events buffer into
``NotificationBuffer`` and flush as one clubbed message per
(org, webhook_url, auth_sig) every ``NOTIFICATION_CLUB_INTERVAL`` seconds.

The ``IMMEDIATE`` value is purely a historical DB value — no code reads
it anymore (the legacy synchronous-dispatch path was removed). The
column and enum value remain so existing rows don't break; both will be
dropped in a follow-up schema migration.
"""

IMMEDIATE = "IMMEDIATE"
BATCHED = "BATCHED"

@classmethod
def choices(cls):
return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls]


class BufferStatus(Enum):
"""Lifecycle states for a NotificationBuffer row.

PENDING — waiting for the next flush tick.
DISPATCHED — successfully sent as part of a clubbed message.
DEAD_LETTER — Celery exhausted retries; terminal, never re-picked.
"""

PENDING = "PENDING"
DISPATCHED = "DISPATCHED"
DEAD_LETTER = "DEAD_LETTER"

@classmethod
def choices(cls):
return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls]
Loading