Skip to content

Populate timer origin field#976

Open
acroca wants to merge 11 commits intodapr:mainfrom
acroca:timers-origin
Open

Populate timer origin field#976
acroca wants to merge 11 commits intodapr:mainfrom
acroca:timers-origin

Conversation

@acroca
Copy link
Copy Markdown
Member

@acroca acroca commented Apr 10, 2026

  • Rebuilt latest protos
  • Populate the new origin field in timers.
  • Introduce automatic timer for wait_for_external_event to create a timer associated with such event. Other SDKs follow this same approach.

Signed-off-by: Albert Callarisa <albert@diagrid.io>
@acroca acroca requested review from a team as code owners April 10, 2026 13:13
Signed-off-by: Albert Callarisa <albert@diagrid.io>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the durable workflow SDK to populate the newly added protobuf origin oneof on timer actions/events, and extends wait_for_external_event to support optional timeouts by internally scheduling a timer (or a far-future timer when no timeout is provided), aligning behavior with other SDKs.

Changes:

  • Add a timeout parameter to wait_for_external_event across the public context APIs and implement it via a composite task that races the external event against a timer.
  • Populate timer origin metadata for user timers (create_timer), external-event wait timers, and retry delay timers (activity + child workflow).
  • Rebuild protobuf Python outputs/stubs and add/adjust unit tests for the new timer-origin behavior.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py Updates existing external-event tests for the new “auto timer” behavior and adds new assertions/tests around timer origin fields.
ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py Extends the public workflow context interface to support wait_for_external_event(..., timeout=...).
ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py Threads the new timeout keyword through the public wrapper context to the underlying durable task context.
ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py Sets timer origins for create-timer and retry timers, and implements external-event timeout/far-future timer scheduling.
ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py Adds ExternalEventWithTimeoutTask composite task and updates the orchestration context contract for the new timeout parameter.
ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py Extends timer action helper to populate the correct origin oneof field.
ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi Regenerated protobuf typing stubs for new timer origin fields.
ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py Regenerated protobuf runtime code for new timer origin fields.
ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi Regenerated protobuf typing stubs for new timer origin fields on history events.
ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py Regenerated protobuf runtime code for new timer origin fields on history events.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Albert Callarisa <albert@diagrid.io>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

acroca added 2 commits April 13, 2026 17:10
Signed-off-by: Albert Callarisa <albert@diagrid.io>
Signed-off-by: Albert Callarisa <albert@diagrid.io>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

acroca added 2 commits April 13, 2026 17:19
Signed-off-by: Albert Callarisa <albert@diagrid.io>
Signed-off-by: Albert Callarisa <albert@diagrid.io>
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 13, 2026

Codecov Report

❌ Patch coverage is 87.87879% with 68 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.93%. Comparing base (bffb749) to head (7805f97).
⚠️ Report is 99 commits behind head on main.

Files with missing lines Patch % Lines
...rkflow/_durabletask/internal/history_events_pb2.py 3.70% 26 Missing ⚠️
...w/tests/durabletask/test_orchestration_executor.py 94.35% 22 Missing ⚠️
.../_durabletask/internal/orchestrator_actions_pb2.py 8.33% 11 Missing ⚠️
...xt-workflow/dapr/ext/workflow/_durabletask/task.py 83.33% 6 Missing ⚠️
...dapr/ext/workflow/_durabletask/internal/helpers.py 96.87% 1 Missing ⚠️
...-workflow/dapr/ext/workflow/_durabletask/worker.py 98.27% 1 Missing ⚠️
...orkflow/dapr/ext/workflow/dapr_workflow_context.py 50.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #976      +/-   ##
==========================================
- Coverage   86.63%   80.93%   -5.70%     
==========================================
  Files          84      136      +52     
  Lines        4473    13090    +8617     
==========================================
+ Hits         3875    10594    +6719     
- Misses        598     2496    +1898     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

acroca added 2 commits April 13, 2026 17:33
Signed-off-by: Albert Callarisa <albert@diagrid.io>
Signed-off-by: Albert Callarisa <albert@diagrid.io>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Albert Callarisa <albert@diagrid.io>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Albert Callarisa <albert@diagrid.io>
Copy link
Copy Markdown
Contributor

@sicoyle sicoyle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice job 👏 👏 👏 🙌 few comments so far and I've reviewed mostttt files

import dapr.ext.workflow._durabletask.internal.protos as pb
from google.protobuf import timestamp_pb2, wrappers_pb2

TimerOrigin = Union[
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you mv these changes into a timer.py file instead of adding to the helpers.py pls

Comment on lines +42 to +79
def is_optional_timer_action(action: pb.WorkflowAction) -> bool:
"""Returns True if the action is an optional TimerOriginExternalEvent timer
with the sentinel fireAt — i.e. created by an indefinite wait_for_external_event.

Pre-patch histories (from prior SDK versions that didn't schedule a timer for
indefinite waits) won't carry a matching TimerCreatedEvent; the replay logic
uses this check to drop the optional action and shift sequence ids.
"""
if not action.HasField('createTimer'):
return False
timer = action.createTimer
if timer.WhichOneof('origin') != 'externalEvent':
return False
return (
timer.fireAt.seconds == OPTIONAL_TIMER_FIRE_AT.seconds
and timer.fireAt.nanos == OPTIONAL_TIMER_FIRE_AT.nanos
)


def is_optional_timer_event(event: pb.HistoryEvent) -> bool:
"""Returns True if a TimerCreatedEvent is the optional sentinel timer.

For replay compatibility, treat a timerCreated event with the sentinel
fireAt as optional even if the proto3 ``origin`` oneof is unset (e.g. when
reading histories emitted by older sidecars that didn't populate it). When
``origin`` *is* populated, it must match TimerOriginExternalEvent.
"""
if not event.HasField('timerCreated'):
return False
timer = event.timerCreated
if (
timer.fireAt.seconds != OPTIONAL_TIMER_FIRE_AT.seconds
or timer.fireAt.nanos != OPTIONAL_TIMER_FIRE_AT.nanos
):
return False
origin = timer.WhichOneof('origin')
return origin in (None, 'externalEvent')

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these look almost like the same func... can we create a helper for the same logic between the two?

Comment on lines +56 to +57
timer.fireAt.seconds == OPTIONAL_TIMER_FIRE_AT.seconds
and timer.fireAt.nanos == OPTIONAL_TIMER_FIRE_AT.nanos
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will both always be set?

Comment on lines +73 to +74
timer.fireAt.seconds != OPTIONAL_TIMER_FIRE_AT.seconds
or timer.fireAt.nanos != OPTIONAL_TIMER_FIRE_AT.nanos
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this one or if the func above uses and for this check?

Comment on lines 106 to +281
@@ -202,10 +272,20 @@ def new_workflow_version_not_available_action(
)


def new_create_timer_action(id: int, fire_at: datetime) -> pb.WorkflowAction:
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(fire_at)
return pb.WorkflowAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp))
def new_create_timer_action(
id: int,
fire_at: Union[datetime, timestamp_pb2.Timestamp],
origin: Optional[TimerOrigin] = None,
) -> pb.WorkflowAction:
if isinstance(fire_at, timestamp_pb2.Timestamp):
timestamp = fire_at
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consistency between timer_id and id params between the two funcs and also vars within on the ts vs timestamp for the same thing pls


DESCRIPTOR: _descriptor.Descriptor

TASKEXECUTIONID_FIELD_NUMBER: _builtins.int
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TASKEXECUTIONID_FIELD_NUMBER: _builtins.int
TASK_EXECUTION_ID_FIELD_NUMBER: _builtins.int

why do we need to track the field number for this? can it just be TASK_EXECUTION_ID instead?

Comment on lines +182 to +189
* ``None`` (default) or a *negative* ``timedelta`` — wait indefinitely.
An optional sentinel timer is scheduled internally for runtime
tracking, but ``TimeoutError`` is never raised on its own.
* ``timedelta(0)`` — do not wait at all. The returned task fails
immediately with ``TimeoutError``.
* A future ``datetime`` or a positive ``timedelta`` — wait until
that deadline / for that duration; ``TimeoutError`` is raised if
the event has not been received in time.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which of these does plain 0 fall under? How do these compare to the other SDK timeout field defaults? How does this compare to other workflow providers for default behaviors?

wait_for_external_event being canceled before any history event arrives.
"""
if self._is_complete:
raise ValueError('The task has already completed.')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this instead be a noop?

if self.is_complete:
return
if completed_task is self._event_task:
if completed_task.is_failed:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is missing a few other terminal states i believe in the if conditional check

Comment on lines +561 to +567
if self._timeout is not None:
msg = (
f'Timed out after {self._timeout!r} waiting for '
f'external event {self._event_name!r}'
)
else:
msg = f'Timed out waiting for external event {self._event_name!r}'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these just be combined instead of split in an if else?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants