Description
When deadline callbacks run in the triggerer process, connection lookups via the async path (Connection.async_get() → _async_get_connection()) fail with AirflowNotFoundException for connection IDs that do exist in the Airflow metadata database (and are visible via airflow connections get). The same connections resolve successfully from the scheduler when using the sync CLI, but the async path used by deadline callback notifiers (e.g. PagerDuty, SMTP) does not see them in the triggerer context.
This causes deadline alerts (e.g. "notify on DAG run timeout") to fail with "The conn_id X isn't defined" even though the connection is defined in the Airflow UI / metastore.
Use case / impact
- Users configure DAG-level deadline callbacks (e.g.
dag=DatasetTriggeredDAG(..., deadline=[pagerduty_deadline_alert(...)])) to get PagerDuty/email alerts when a run exceeds a time limit.
- The callback runs in the triggerer. It calls the provider’s async hook (e.g. PagerDuty), which calls
get_async_connection(conn_id) → BaseHook.aget_connection() → Connection.async_get() → _async_get_connection() in airflow/sdk/execution_time/context.py.
_async_get_connection() uses ensure_secrets_backend_loaded() to decide which backends to query. In the triggerer process, the effective context is the fallback chain (no SUPERVISOR_COMMS, and triggerer does not set _AIRFLOW_PROCESS_CONTEXT=server), so only EnvironmentVariablesBackend and any configured external backends (e.g. AWS Secrets Manager) are used—MetastoreBackend is not included.
- Connections stored only in the metadata DB (e.g. created/edited in the Airflow UI) are therefore never found by the async path in the triggerer, and the callback raises
AirflowNotFoundException.
What you expected to happen
Connections that exist in the Airflow metadata database (and are visible via airflow connections get) should be resolvable when deadline callbacks run in the triggerer, so that notifiers (PagerDuty, SMTP, etc.) can use the same connection configuration as the rest of the deployment.
What actually happened
- Scheduler (sync path):
airflow connections get <conn_id> succeeds and returns the connection (metastore is used by the sync path / server context).
- Triggerer (async path): Inside the triggerer container,
Connection.async_get(conn_id) for the same conn_id raises AirflowNotFoundException: The conn_id '<conn_id>' isn't defined.
- Deadline callbacks that use these connection IDs (e.g.
pagerduty_events_conn_id, smtp_conn_id) fail with the same exception when the triggerer runs the callback.
How to reproduce
- Deploy Airflow 3.x with a triggerer and a secrets backend configuration that includes both:
- MetastoreBackend (default for “server” context), and
- An optional external backend (e.g. AWS Secrets Manager).
- Create a connection in the Airflow UI (metastore only), e.g.
my_pagerduty (type pagerduty_events).
- Define a DAG with a deadline callback that uses that connection, e.g.:
deadline=[PagerDutyNotifier(pagerduty_events_conn_id="my_pagerduty", ...)]
- Trigger a run that hits the deadline so the triggerer executes the callback.
- Observe: the callback fails with
AirflowNotFoundException: The conn_id 'my_pagerduty' isn't defined.
In-container check (triggerer):
# In triggerer container (same process context as deadline callbacks):
from airflow.sdk.definitions.connection import Connection
import asyncio
asyncio.run(Connection.async_get("my_pagerduty")) # -> AirflowNotFoundException
CLI in same container:
airflow connections get my_pagerduty # -> returns connection (CLI may use different backend resolution)
So the async code path used by the triggerer does not see metastore-backed connections.
Environment
- Airflow version: 3.1.7
- Runtime: Astronomer Astro (Runtime 3.1-13); triggerer runs as separate K8s deployment.
- Secrets:
AIRFLOW__SECRETS__BACKEND=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend with backend kwargs; connections also exist in metastore (Airflow UI).
- Components: Scheduler (sync/CLI can see metastore); Triggerer (async path used by deadline callbacks cannot).
Code references
- Async connection lookup used by deadline callbacks:
airflow/task-sdk/src/airflow/sdk/execution_time/context.py — _async_get_connection() (lines ~180–225). It calls ensure_secrets_backend_loaded() and iterates only over the returned backends; if none return the connection, it raises AirflowNotFoundException.
- Backend selection:
airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py — ensure_secrets_backend_loaded() (around 1933–1975).
- If
_AIRFLOW_PROCESS_CONTEXT=server: uses default server chain (includes MetastoreBackend).
- Otherwise (e.g. triggerer): uses “fallback” chain: only
EnvironmentVariablesBackend plus configured external backends; MetastoreBackend is not in the list.
- Provider call path:
airflow/providers/pagerduty/hooks/pagerduty_events.py — get_integration_key() calls get_async_connection(self.pagerduty_events_conn_id) (around line 288). Same pattern for SMTP and other notifiers used in deadline callbacks.
Possible solutions
- Treat triggerer as server-like for secrets: When loading secrets in the process that runs the triggerer job, include MetastoreBackend in the backend chain (e.g. by setting
_AIRFLOW_PROCESS_CONTEXT=server for the triggerer process or by explicitly including metastore in the “fallback” chain for triggerer).
- Document the gap: If the current behavior is intentional (e.g. triggerer is considered a “worker” context), document that connections used by deadline callbacks must be available from an external secrets backend (e.g. AWS Secrets Manager), not only from the Airflow UI/metastore.
- Unify behavior: Ensure that any context that runs user callbacks (including triggerer) can resolve connections from the same sources as the scheduler/webserver (including metastore), so that UI-defined connections work for deadline notifiers without requiring duplication in an external backend.
Additional context
- Multiple connection IDs observed failing in the same way: PagerDuty conns and
smtp_default (referenced by AIRFLOW__EMAIL__EMAIL_CONN_ID). All exist in metastore and are visible via airflow connections get from scheduler/triggerer CLI, but Connection.async_get() fails in the triggerer process.
- Stack trace from logs points to:
airflow/triggers/deadline.py → notifier async_notify() → provider hook get_integration_key() / aget_connection() → Connection.async_get() → _async_get_connection() → raise AirflowNotFoundException.
Description
When deadline callbacks run in the triggerer process, connection lookups via the async path (
Connection.async_get()→_async_get_connection()) fail withAirflowNotFoundExceptionfor connection IDs that do exist in the Airflow metadata database (and are visible viaairflow connections get). The same connections resolve successfully from the scheduler when using the sync CLI, but the async path used by deadline callback notifiers (e.g. PagerDuty, SMTP) does not see them in the triggerer context.This causes deadline alerts (e.g. "notify on DAG run timeout") to fail with "The conn_id
Xisn't defined" even though the connection is defined in the Airflow UI / metastore.Use case / impact
dag=DatasetTriggeredDAG(..., deadline=[pagerduty_deadline_alert(...)])) to get PagerDuty/email alerts when a run exceeds a time limit.get_async_connection(conn_id)→BaseHook.aget_connection()→Connection.async_get()→_async_get_connection()inairflow/sdk/execution_time/context.py._async_get_connection()usesensure_secrets_backend_loaded()to decide which backends to query. In the triggerer process, the effective context is the fallback chain (noSUPERVISOR_COMMS, and triggerer does not set_AIRFLOW_PROCESS_CONTEXT=server), so onlyEnvironmentVariablesBackendand any configured external backends (e.g. AWS Secrets Manager) are used—MetastoreBackend is not included.AirflowNotFoundException.What you expected to happen
Connections that exist in the Airflow metadata database (and are visible via
airflow connections get) should be resolvable when deadline callbacks run in the triggerer, so that notifiers (PagerDuty, SMTP, etc.) can use the same connection configuration as the rest of the deployment.What actually happened
airflow connections get <conn_id>succeeds and returns the connection (metastore is used by the sync path / server context).Connection.async_get(conn_id)for the sameconn_idraisesAirflowNotFoundException: The conn_id '<conn_id>' isn't defined.pagerduty_events_conn_id,smtp_conn_id) fail with the same exception when the triggerer runs the callback.How to reproduce
my_pagerduty(typepagerduty_events).deadline=[PagerDutyNotifier(pagerduty_events_conn_id="my_pagerduty", ...)]AirflowNotFoundException: The conn_id 'my_pagerduty' isn't defined.In-container check (triggerer):
CLI in same container:
airflow connections get my_pagerduty # -> returns connection (CLI may use different backend resolution)So the async code path used by the triggerer does not see metastore-backed connections.
Environment
AIRFLOW__SECRETS__BACKEND=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackendwith backend kwargs; connections also exist in metastore (Airflow UI).Code references
airflow/task-sdk/src/airflow/sdk/execution_time/context.py—_async_get_connection()(lines ~180–225). It callsensure_secrets_backend_loaded()and iterates only over the returned backends; if none return the connection, it raisesAirflowNotFoundException.airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py—ensure_secrets_backend_loaded()(around 1933–1975)._AIRFLOW_PROCESS_CONTEXT=server: uses default server chain (includes MetastoreBackend).EnvironmentVariablesBackendplus configured external backends; MetastoreBackend is not in the list.airflow/providers/pagerduty/hooks/pagerduty_events.py—get_integration_key()callsget_async_connection(self.pagerduty_events_conn_id)(around line 288). Same pattern for SMTP and other notifiers used in deadline callbacks.Possible solutions
_AIRFLOW_PROCESS_CONTEXT=serverfor the triggerer process or by explicitly including metastore in the “fallback” chain for triggerer).Additional context
smtp_default(referenced byAIRFLOW__EMAIL__EMAIL_CONN_ID). All exist in metastore and are visible viaairflow connections getfrom scheduler/triggerer CLI, butConnection.async_get()fails in the triggerer process.airflow/triggers/deadline.py→ notifierasync_notify()→ provider hookget_integration_key()/aget_connection()→Connection.async_get()→_async_get_connection()→ raiseAirflowNotFoundException.