Fix deferrable execution_timeout handling in AirbyteTriggerSyncOperator#67382
Fix deferrable execution_timeout handling in AirbyteTriggerSyncOperator#67382SameerMesiah97 wants to merge 1 commit into
Conversation
db25926 to
f465a78
Compare
Prevent framework-level deferred timeouts from bypassing execute_complete() cancellation handling for Airbyte jobs.
f465a78 to
7a3d3bd
Compare
| self.log.debug("Running in deferrable mode in job state %s...", state) | ||
| if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE): | ||
| self.defer( | ||
| timeout=self.execution_timeout, |
There was a problem hiding this comment.
I think there is an edge case here. If execution_timeout=30s, but the Airbyte job reaches SUCCEEDED after 45s, the trigger can still mark the task as successful because it checks the Airbyte job status before checking execution_deadline:
After this PR changes defer(timeout=None), the framework-level deferred timeout no longer catches this case. Should the trigger check execution_deadline before accepting SUCCEEDED / CANCELLED, so execution_timeout remains a hard task-level limit?
There was a problem hiding this comment.
I agree. This is a very valid edge case but this PR was submitted on short notice to fix a resource leak before an RC2 can be cut for the airbyte provider. For the fix in this PR to land in RC2, it has to be merged by 2026-05-26 so the scope is purposely being kept very tight so that even a generalist reviewer can confidently sign off on it. I believe it might be better for me to open another PR to address your comment. If you disagree with my reasoning, you are free to express yourself here.
Description
This change fixes an issue (as reported in Issue #64048) affecting
AirbyteTriggerSyncOperatorin deferrable mode that was not fully resolved by PR #64051.Previously,
execution_timeoutwas passed directly todefer(). The operator now explicitly passestimeout=Nonetodefer()while still preservingexecution_deadlinehandling within the trigger/operator flow.Rationale
In deferrable mode, Airbyte job cancellation is performed within
execute_complete()when the trigger emits a timeout event.However, framework-level deferred timeout handling bypasses
execute_complete()entirely and does not invokeon_kill()in the triggerer process. As a result, Airbyte jobs could continue running after the Airflow task timed out, leading to leaked workloads and excessive resource consumption.While this weakens the framework-level deferred timeout guarantee, in this case preventing leaked external workloads takes precedence because there is currently no equivalent cleanup path in the triggerer process.
This change keeps timeout handling within the trigger/operator flow so Airbyte job cancellation can occur correctly.
Tests
Added an operator-level regression test verifying that
defer()is called withtimeout=Nonewhileexecution_deadlinehandling remains preserved for Airbyte job timeout cancellation processing.Backwards Compatibility
This change does not modify public APIs or method signatures.
Related: #64048
Was generative AI tooling used to co-author this PR?
Generated-by: [GPT 5.5] following the guidelines