From fae65e92d9bf7d6fa5a8b3e6088c20619c218802 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Fri, 10 Apr 2026 14:53:49 +0200 Subject: [PATCH 01/13] Populate timer origin field Signed-off-by: Albert Callarisa --- .../workflow/_durabletask/internal/helpers.py | 25 +- .../internal/history_events_pb2.py | 50 ++- .../internal/history_events_pb2.pyi | 52 ++- .../internal/orchestrator_actions_pb2.py | 24 +- .../internal/orchestrator_actions_pb2.pyi | 14 +- .../dapr/ext/workflow/_durabletask/task.py | 40 +- .../dapr/ext/workflow/_durabletask/worker.py | 42 +- .../ext/workflow/dapr_workflow_context.py | 9 +- .../dapr/ext/workflow/workflow_context.py | 13 +- .../test_orchestration_executor.py | 411 +++++++++++++++++- 10 files changed, 618 insertions(+), 62 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py index ebfb3402..0f63cde9 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py @@ -11,11 +11,25 @@ import traceback from datetime import datetime -from typing import Optional +from typing import Optional, Union import dapr.ext.workflow._durabletask.internal.protos as pb from google.protobuf import timestamp_pb2, wrappers_pb2 +TimerOrigin = Union[ + pb.TimerOriginCreateTimer, + pb.TimerOriginExternalEvent, + pb.TimerOriginActivityRetry, + pb.TimerOriginChildWorkflowRetry, +] + +_ORIGIN_FIELD: dict[type, str] = { + pb.TimerOriginCreateTimer: 'createTimer', + pb.TimerOriginExternalEvent: 'externalEvent', + pb.TimerOriginActivityRetry: 'activityRetry', + pb.TimerOriginChildWorkflowRetry: 'childWorkflowRetry', +} + # TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere @@ -202,10 +216,15 @@ def new_workflow_version_not_available_action( ) -def new_create_timer_action(id: int, fire_at: datetime) -> pb.WorkflowAction: +def new_create_timer_action( + id: int, fire_at: datetime, origin: Optional[TimerOrigin] = None +) -> pb.WorkflowAction: timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(fire_at) - return pb.WorkflowAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp)) + origin_kwargs = {_ORIGIN_FIELD[type(origin)]: origin} if origin is not None else {} + return pb.WorkflowAction( + id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp, **origin_kwargs) + ) def new_schedule_task_action( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py index 7a5dea47..feb7e313 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py @@ -27,7 +27,7 @@ from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14history_events.proto\x1a\x13orchestration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\xd6\x03\n\x15\x45xecutionStartedEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x10workflowInstance\x18\x04 \x01(\x0b\x32\x11.WorkflowInstance\x12+\n\x0eparentInstance\x18\x05 \x01(\x0b\x32\x13.ParentInstanceInfo\x12;\n\x17scheduledStartTimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12)\n\x12parentTraceContext\x18\x07 \x01(\x0b\x32\r.TraceContext\x12\x34\n\x0eworkflowSpanID\x18\x08 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12.\n\x04tags\x18\t \x03(\x0b\x32 .ExecutionStartedEvent.TagsEntry\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xa2\x01\n\x17\x45xecutionCompletedEvent\x12,\n\x0eworkflowStatus\x18\x01 \x01(\x0e\x32\x14.OrchestrationStatus\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x0e\x66\x61ilureDetails\x18\x03 \x01(\x0b\x32\x13.TaskFailureDetails\"X\n\x18\x45xecutionTerminatedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x0f\n\x07recurse\x18\x02 \x01(\x08\"\x9e\x02\n\x12TaskScheduledEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12)\n\x12parentTraceContext\x18\x04 \x01(\x0b\x32\r.TraceContext\x12\x17\n\x0ftaskExecutionId\x18\x05 \x01(\t\x12>\n\x17rerunParentInstanceInfo\x18\x06 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x00\x88\x01\x01\x42\x1a\n\x18_rerunParentInstanceInfo\"t\n\x12TaskCompletedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x17\n\x0ftaskExecutionId\x18\x03 \x01(\t\"p\n\x0fTaskFailedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12+\n\x0e\x66\x61ilureDetails\x18\x02 \x01(\x0b\x32\x13.TaskFailureDetails\x12\x17\n\x0ftaskExecutionId\x18\x03 \x01(\t\"\xa8\x02\n!ChildWorkflowInstanceCreatedEvent\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12)\n\x12parentTraceContext\x18\x05 \x01(\x0b\x32\r.TraceContext\x12>\n\x17rerunParentInstanceInfo\x18\x06 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x00\x88\x01\x01\x42\x1a\n\x18_rerunParentInstanceInfo\"l\n#ChildWorkflowInstanceCompletedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"h\n ChildWorkflowInstanceFailedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12+\n\x0e\x66\x61ilureDetails\x18\x02 \x01(\x0b\x32\x13.TaskFailureDetails\"\x18\n\x16TimerOriginCreateTimer\"(\n\x18TimerOriginExternalEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\"\xa5\x02\n\x11TimerCreatedEvent\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12>\n\x17rerunParentInstanceInfo\x18\x03 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x02\x88\x01\x01\x12.\n\x0b\x63reateTimer\x18\x04 \x01(\x0b\x32\x17.TimerOriginCreateTimerH\x00\x12\x32\n\rexternalEvent\x18\x05 \x01(\x0b\x32\x19.TimerOriginExternalEventH\x00\x42\x08\n\x06originB\x07\n\x05_nameB\x1a\n\x18_rerunParentInstanceInfo\"N\n\x0fTimerFiredEvent\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0f\n\x07timerId\x18\x02 \x01(\x05\"J\n\x14WorkflowStartedEvent\x12&\n\x07version\x18\x01 \x01(\x0b\x32\x10.WorkflowVersionH\x00\x88\x01\x01\x42\n\n\x08_version\"\x18\n\x16WorkflowCompletedEvent\"_\n\x0e\x45ventSentEvent\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"M\n\x10\x45ventRaisedEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x05input\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"A\n\x12\x43ontinueAsNewEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"F\n\x17\x45xecutionSuspendedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"D\n\x15\x45xecutionResumedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"a\n\x15\x45xecutionStalledEvent\x12\x1e\n\x06reason\x18\x01 \x01(\x0e\x32\x0e.StalledReason\x12\x18\n\x0b\x64\x65scription\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x0e\n\x0c_description\"\xa8\t\n\x0cHistoryEvent\x12\x0f\n\x07\x65ventId\x18\x01 \x01(\x05\x12-\n\ttimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x32\n\x10\x65xecutionStarted\x18\x03 \x01(\x0b\x32\x16.ExecutionStartedEventH\x00\x12\x36\n\x12\x65xecutionCompleted\x18\x04 \x01(\x0b\x32\x18.ExecutionCompletedEventH\x00\x12\x38\n\x13\x65xecutionTerminated\x18\x05 \x01(\x0b\x32\x19.ExecutionTerminatedEventH\x00\x12,\n\rtaskScheduled\x18\x06 \x01(\x0b\x32\x13.TaskScheduledEventH\x00\x12,\n\rtaskCompleted\x18\x07 \x01(\x0b\x32\x13.TaskCompletedEventH\x00\x12&\n\ntaskFailed\x18\x08 \x01(\x0b\x32\x10.TaskFailedEventH\x00\x12J\n\x1c\x63hildWorkflowInstanceCreated\x18\t \x01(\x0b\x32\".ChildWorkflowInstanceCreatedEventH\x00\x12N\n\x1e\x63hildWorkflowInstanceCompleted\x18\n \x01(\x0b\x32$.ChildWorkflowInstanceCompletedEventH\x00\x12H\n\x1b\x63hildWorkflowInstanceFailed\x18\x0b \x01(\x0b\x32!.ChildWorkflowInstanceFailedEventH\x00\x12*\n\x0ctimerCreated\x18\x0c \x01(\x0b\x32\x12.TimerCreatedEventH\x00\x12&\n\ntimerFired\x18\r \x01(\x0b\x32\x10.TimerFiredEventH\x00\x12\x30\n\x0fworkflowStarted\x18\x0e \x01(\x0b\x32\x15.WorkflowStartedEventH\x00\x12\x34\n\x11workflowCompleted\x18\x0f \x01(\x0b\x32\x17.WorkflowCompletedEventH\x00\x12$\n\teventSent\x18\x10 \x01(\x0b\x32\x0f.EventSentEventH\x00\x12(\n\x0b\x65ventRaised\x18\x11 \x01(\x0b\x32\x11.EventRaisedEventH\x00\x12,\n\rcontinueAsNew\x18\x14 \x01(\x0b\x32\x13.ContinueAsNewEventH\x00\x12\x36\n\x12\x65xecutionSuspended\x18\x15 \x01(\x0b\x32\x18.ExecutionSuspendedEventH\x00\x12\x32\n\x10\x65xecutionResumed\x18\x16 \x01(\x0b\x32\x16.ExecutionResumedEventH\x00\x12\x32\n\x10\x65xecutionStalled\x18\x1f \x01(\x0b\x32\x16.ExecutionStalledEventH\x00\x12 \n\x06router\x18\x1e \x01(\x0b\x32\x0b.TaskRouterH\x01\x88\x01\x01\x42\x0b\n\teventTypeB\t\n\x07_routerJ\x04\x08\x12\x10\x13J\x04\x08\x13\x10\x14J\x04\x08\x17\x10\x18J\x04\x08\x18\x10\x19J\x04\x08\x19\x10\x1aJ\x04\x08\x1a\x10\x1bJ\x04\x08\x1b\x10\x1cJ\x04\x08\x1c\x10\x1dJ\x04\x08\x1d\x10\x1e\x42V\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14history_events.proto\x1a\x13orchestration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\xd6\x03\n\x15\x45xecutionStartedEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x10workflowInstance\x18\x04 \x01(\x0b\x32\x11.WorkflowInstance\x12+\n\x0eparentInstance\x18\x05 \x01(\x0b\x32\x13.ParentInstanceInfo\x12;\n\x17scheduledStartTimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12)\n\x12parentTraceContext\x18\x07 \x01(\x0b\x32\r.TraceContext\x12\x34\n\x0eworkflowSpanID\x18\x08 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12.\n\x04tags\x18\t \x03(\x0b\x32 .ExecutionStartedEvent.TagsEntry\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xa2\x01\n\x17\x45xecutionCompletedEvent\x12,\n\x0eworkflowStatus\x18\x01 \x01(\x0e\x32\x14.OrchestrationStatus\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x0e\x66\x61ilureDetails\x18\x03 \x01(\x0b\x32\x13.TaskFailureDetails\"X\n\x18\x45xecutionTerminatedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x0f\n\x07recurse\x18\x02 \x01(\x08\"\x9e\x02\n\x12TaskScheduledEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12)\n\x12parentTraceContext\x18\x04 \x01(\x0b\x32\r.TraceContext\x12\x17\n\x0ftaskExecutionId\x18\x05 \x01(\t\x12>\n\x17rerunParentInstanceInfo\x18\x06 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x00\x88\x01\x01\x42\x1a\n\x18_rerunParentInstanceInfo\"t\n\x12TaskCompletedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x17\n\x0ftaskExecutionId\x18\x03 \x01(\t\"p\n\x0fTaskFailedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12+\n\x0e\x66\x61ilureDetails\x18\x02 \x01(\x0b\x32\x13.TaskFailureDetails\x12\x17\n\x0ftaskExecutionId\x18\x03 \x01(\t\"\xa8\x02\n!ChildWorkflowInstanceCreatedEvent\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12)\n\x12parentTraceContext\x18\x05 \x01(\x0b\x32\r.TraceContext\x12>\n\x17rerunParentInstanceInfo\x18\x06 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x00\x88\x01\x01\x42\x1a\n\x18_rerunParentInstanceInfo\"l\n#ChildWorkflowInstanceCompletedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"h\n ChildWorkflowInstanceFailedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12+\n\x0e\x66\x61ilureDetails\x18\x02 \x01(\x0b\x32\x13.TaskFailureDetails\"\x18\n\x16TimerOriginCreateTimer\"(\n\x18TimerOriginExternalEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\"3\n\x18TimerOriginActivityRetry\x12\x17\n\x0ftaskExecutionId\x18\x01 \x01(\t\"3\n\x1dTimerOriginChildWorkflowRetry\x12\x12\n\ninstanceId\x18\x01 \x01(\t\"\x97\x03\n\x11TimerCreatedEvent\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12>\n\x17rerunParentInstanceInfo\x18\x03 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x02\x88\x01\x01\x12.\n\x0b\x63reateTimer\x18\x04 \x01(\x0b\x32\x17.TimerOriginCreateTimerH\x00\x12\x32\n\rexternalEvent\x18\x05 \x01(\x0b\x32\x19.TimerOriginExternalEventH\x00\x12\x32\n\ractivityRetry\x18\x06 \x01(\x0b\x32\x19.TimerOriginActivityRetryH\x00\x12<\n\x12\x63hildWorkflowRetry\x18\x07 \x01(\x0b\x32\x1e.TimerOriginChildWorkflowRetryH\x00\x42\x08\n\x06originB\x07\n\x05_nameB\x1a\n\x18_rerunParentInstanceInfo\"N\n\x0fTimerFiredEvent\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0f\n\x07timerId\x18\x02 \x01(\x05\"J\n\x14WorkflowStartedEvent\x12&\n\x07version\x18\x01 \x01(\x0b\x32\x10.WorkflowVersionH\x00\x88\x01\x01\x42\n\n\x08_version\"\x18\n\x16WorkflowCompletedEvent\"_\n\x0e\x45ventSentEvent\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"M\n\x10\x45ventRaisedEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x05input\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"A\n\x12\x43ontinueAsNewEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"F\n\x17\x45xecutionSuspendedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"D\n\x15\x45xecutionResumedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"a\n\x15\x45xecutionStalledEvent\x12\x1e\n\x06reason\x18\x01 \x01(\x0e\x32\x0e.StalledReason\x12\x18\n\x0b\x64\x65scription\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x0e\n\x0c_description\"\xa8\t\n\x0cHistoryEvent\x12\x0f\n\x07\x65ventId\x18\x01 \x01(\x05\x12-\n\ttimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x32\n\x10\x65xecutionStarted\x18\x03 \x01(\x0b\x32\x16.ExecutionStartedEventH\x00\x12\x36\n\x12\x65xecutionCompleted\x18\x04 \x01(\x0b\x32\x18.ExecutionCompletedEventH\x00\x12\x38\n\x13\x65xecutionTerminated\x18\x05 \x01(\x0b\x32\x19.ExecutionTerminatedEventH\x00\x12,\n\rtaskScheduled\x18\x06 \x01(\x0b\x32\x13.TaskScheduledEventH\x00\x12,\n\rtaskCompleted\x18\x07 \x01(\x0b\x32\x13.TaskCompletedEventH\x00\x12&\n\ntaskFailed\x18\x08 \x01(\x0b\x32\x10.TaskFailedEventH\x00\x12J\n\x1c\x63hildWorkflowInstanceCreated\x18\t \x01(\x0b\x32\".ChildWorkflowInstanceCreatedEventH\x00\x12N\n\x1e\x63hildWorkflowInstanceCompleted\x18\n \x01(\x0b\x32$.ChildWorkflowInstanceCompletedEventH\x00\x12H\n\x1b\x63hildWorkflowInstanceFailed\x18\x0b \x01(\x0b\x32!.ChildWorkflowInstanceFailedEventH\x00\x12*\n\x0ctimerCreated\x18\x0c \x01(\x0b\x32\x12.TimerCreatedEventH\x00\x12&\n\ntimerFired\x18\r \x01(\x0b\x32\x10.TimerFiredEventH\x00\x12\x30\n\x0fworkflowStarted\x18\x0e \x01(\x0b\x32\x15.WorkflowStartedEventH\x00\x12\x34\n\x11workflowCompleted\x18\x0f \x01(\x0b\x32\x17.WorkflowCompletedEventH\x00\x12$\n\teventSent\x18\x10 \x01(\x0b\x32\x0f.EventSentEventH\x00\x12(\n\x0b\x65ventRaised\x18\x11 \x01(\x0b\x32\x11.EventRaisedEventH\x00\x12,\n\rcontinueAsNew\x18\x14 \x01(\x0b\x32\x13.ContinueAsNewEventH\x00\x12\x36\n\x12\x65xecutionSuspended\x18\x15 \x01(\x0b\x32\x18.ExecutionSuspendedEventH\x00\x12\x32\n\x10\x65xecutionResumed\x18\x16 \x01(\x0b\x32\x16.ExecutionResumedEventH\x00\x12\x32\n\x10\x65xecutionStalled\x18\x1f \x01(\x0b\x32\x16.ExecutionStalledEventH\x00\x12 \n\x06router\x18\x1e \x01(\x0b\x32\x0b.TaskRouterH\x01\x88\x01\x01\x42\x0b\n\teventTypeB\t\n\x07_routerJ\x04\x08\x12\x10\x13J\x04\x08\x13\x10\x14J\x04\x08\x17\x10\x18J\x04\x08\x18\x10\x19J\x04\x08\x19\x10\x1aJ\x04\x08\x1a\x10\x1bJ\x04\x08\x1b\x10\x1cJ\x04\x08\x1c\x10\x1dJ\x04\x08\x1d\x10\x1e\x42V\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -61,26 +61,30 @@ _globals['_TIMERORIGINCREATETIMER']._serialized_end=1898 _globals['_TIMERORIGINEXTERNALEVENT']._serialized_start=1900 _globals['_TIMERORIGINEXTERNALEVENT']._serialized_end=1940 - _globals['_TIMERCREATEDEVENT']._serialized_start=1943 - _globals['_TIMERCREATEDEVENT']._serialized_end=2236 - _globals['_TIMERFIREDEVENT']._serialized_start=2238 - _globals['_TIMERFIREDEVENT']._serialized_end=2316 - _globals['_WORKFLOWSTARTEDEVENT']._serialized_start=2318 - _globals['_WORKFLOWSTARTEDEVENT']._serialized_end=2392 - _globals['_WORKFLOWCOMPLETEDEVENT']._serialized_start=2394 - _globals['_WORKFLOWCOMPLETEDEVENT']._serialized_end=2418 - _globals['_EVENTSENTEVENT']._serialized_start=2420 - _globals['_EVENTSENTEVENT']._serialized_end=2515 - _globals['_EVENTRAISEDEVENT']._serialized_start=2517 - _globals['_EVENTRAISEDEVENT']._serialized_end=2594 - _globals['_CONTINUEASNEWEVENT']._serialized_start=2596 - _globals['_CONTINUEASNEWEVENT']._serialized_end=2661 - _globals['_EXECUTIONSUSPENDEDEVENT']._serialized_start=2663 - _globals['_EXECUTIONSUSPENDEDEVENT']._serialized_end=2733 - _globals['_EXECUTIONRESUMEDEVENT']._serialized_start=2735 - _globals['_EXECUTIONRESUMEDEVENT']._serialized_end=2803 - _globals['_EXECUTIONSTALLEDEVENT']._serialized_start=2805 - _globals['_EXECUTIONSTALLEDEVENT']._serialized_end=2902 - _globals['_HISTORYEVENT']._serialized_start=2905 - _globals['_HISTORYEVENT']._serialized_end=4097 + _globals['_TIMERORIGINACTIVITYRETRY']._serialized_start=1942 + _globals['_TIMERORIGINACTIVITYRETRY']._serialized_end=1993 + _globals['_TIMERORIGINCHILDWORKFLOWRETRY']._serialized_start=1995 + _globals['_TIMERORIGINCHILDWORKFLOWRETRY']._serialized_end=2046 + _globals['_TIMERCREATEDEVENT']._serialized_start=2049 + _globals['_TIMERCREATEDEVENT']._serialized_end=2456 + _globals['_TIMERFIREDEVENT']._serialized_start=2458 + _globals['_TIMERFIREDEVENT']._serialized_end=2536 + _globals['_WORKFLOWSTARTEDEVENT']._serialized_start=2538 + _globals['_WORKFLOWSTARTEDEVENT']._serialized_end=2612 + _globals['_WORKFLOWCOMPLETEDEVENT']._serialized_start=2614 + _globals['_WORKFLOWCOMPLETEDEVENT']._serialized_end=2638 + _globals['_EVENTSENTEVENT']._serialized_start=2640 + _globals['_EVENTSENTEVENT']._serialized_end=2735 + _globals['_EVENTRAISEDEVENT']._serialized_start=2737 + _globals['_EVENTRAISEDEVENT']._serialized_end=2814 + _globals['_CONTINUEASNEWEVENT']._serialized_start=2816 + _globals['_CONTINUEASNEWEVENT']._serialized_end=2881 + _globals['_EXECUTIONSUSPENDEDEVENT']._serialized_start=2883 + _globals['_EXECUTIONSUSPENDEDEVENT']._serialized_end=2953 + _globals['_EXECUTIONRESUMEDEVENT']._serialized_start=2955 + _globals['_EXECUTIONRESUMEDEVENT']._serialized_end=3023 + _globals['_EXECUTIONSTALLEDEVENT']._serialized_start=3025 + _globals['_EXECUTIONSTALLEDEVENT']._serialized_end=3122 + _globals['_HISTORYEVENT']._serialized_start=3125 + _globals['_HISTORYEVENT']._serialized_end=4317 # @@protoc_insertion_point(module_scope) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi index 26ff5cfd..66ba6ba6 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi @@ -351,6 +351,44 @@ class TimerOriginExternalEvent(_message.Message): Global___TimerOriginExternalEvent: _TypeAlias = TimerOriginExternalEvent # noqa: Y015 +@_typing.final +class TimerOriginActivityRetry(_message.Message): + """Indicates the timer was created as a retry delay for an activity execution.""" + + DESCRIPTOR: _descriptor.Descriptor + + TASKEXECUTIONID_FIELD_NUMBER: _builtins.int + taskExecutionId: _builtins.str + """The task execution ID of the activity being retried.""" + def __init__( + self, + *, + taskExecutionId: _builtins.str = ..., + ) -> None: ... + _ClearFieldArgType: _TypeAlias = _typing.Literal["taskExecutionId", b"taskExecutionId"] # noqa: Y015 + def ClearField(self, field_name: _ClearFieldArgType) -> None: ... + +Global___TimerOriginActivityRetry: _TypeAlias = TimerOriginActivityRetry # noqa: Y015 + +@_typing.final +class TimerOriginChildWorkflowRetry(_message.Message): + """Indicates the timer was created as a retry delay for a child workflow execution.""" + + DESCRIPTOR: _descriptor.Descriptor + + INSTANCEID_FIELD_NUMBER: _builtins.int + instanceId: _builtins.str + """The instance ID of the workflow being retried.""" + def __init__( + self, + *, + instanceId: _builtins.str = ..., + ) -> None: ... + _ClearFieldArgType: _TypeAlias = _typing.Literal["instanceId", b"instanceId"] # noqa: Y015 + def ClearField(self, field_name: _ClearFieldArgType) -> None: ... + +Global___TimerOriginChildWorkflowRetry: _TypeAlias = TimerOriginChildWorkflowRetry # noqa: Y015 + @_typing.final class TimerCreatedEvent(_message.Message): DESCRIPTOR: _descriptor.Descriptor @@ -360,6 +398,8 @@ class TimerCreatedEvent(_message.Message): RERUNPARENTINSTANCEINFO_FIELD_NUMBER: _builtins.int CREATETIMER_FIELD_NUMBER: _builtins.int EXTERNALEVENT_FIELD_NUMBER: _builtins.int + ACTIVITYRETRY_FIELD_NUMBER: _builtins.int + CHILDWORKFLOWRETRY_FIELD_NUMBER: _builtins.int name: _builtins.str @_builtins.property def fireAt(self) -> _timestamp_pb2.Timestamp: ... @@ -373,6 +413,10 @@ class TimerCreatedEvent(_message.Message): def createTimer(self) -> Global___TimerOriginCreateTimer: ... @_builtins.property def externalEvent(self) -> Global___TimerOriginExternalEvent: ... + @_builtins.property + def activityRetry(self) -> Global___TimerOriginActivityRetry: ... + @_builtins.property + def childWorkflowRetry(self) -> Global___TimerOriginChildWorkflowRetry: ... def __init__( self, *, @@ -381,16 +425,18 @@ class TimerCreatedEvent(_message.Message): rerunParentInstanceInfo: _orchestration_pb2.RerunParentInstanceInfo | None = ..., createTimer: Global___TimerOriginCreateTimer | None = ..., externalEvent: Global___TimerOriginExternalEvent | None = ..., + activityRetry: Global___TimerOriginActivityRetry | None = ..., + childWorkflowRetry: Global___TimerOriginChildWorkflowRetry | None = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "_rerunParentInstanceInfo", b"_rerunParentInstanceInfo", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin", "rerunParentInstanceInfo", b"rerunParentInstanceInfo"] # noqa: Y015 + _HasFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "_rerunParentInstanceInfo", b"_rerunParentInstanceInfo", "activityRetry", b"activityRetry", "childWorkflowRetry", b"childWorkflowRetry", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin", "rerunParentInstanceInfo", b"rerunParentInstanceInfo"] # noqa: Y015 def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "_rerunParentInstanceInfo", b"_rerunParentInstanceInfo", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin", "rerunParentInstanceInfo", b"rerunParentInstanceInfo"] # noqa: Y015 + _ClearFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "_rerunParentInstanceInfo", b"_rerunParentInstanceInfo", "activityRetry", b"activityRetry", "childWorkflowRetry", b"childWorkflowRetry", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin", "rerunParentInstanceInfo", b"rerunParentInstanceInfo"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... _WhichOneofReturnType__name: _TypeAlias = _typing.Literal["name"] # noqa: Y015 _WhichOneofArgType__name: _TypeAlias = _typing.Literal["_name", b"_name"] # noqa: Y015 _WhichOneofReturnType__rerunParentInstanceInfo: _TypeAlias = _typing.Literal["rerunParentInstanceInfo"] # noqa: Y015 _WhichOneofArgType__rerunParentInstanceInfo: _TypeAlias = _typing.Literal["_rerunParentInstanceInfo", b"_rerunParentInstanceInfo"] # noqa: Y015 - _WhichOneofReturnType_origin: _TypeAlias = _typing.Literal["createTimer", "externalEvent"] # noqa: Y015 + _WhichOneofReturnType_origin: _TypeAlias = _typing.Literal["createTimer", "externalEvent", "activityRetry", "childWorkflowRetry"] # noqa: Y015 _WhichOneofArgType_origin: _TypeAlias = _typing.Literal["origin", b"origin"] # noqa: Y015 @_typing.overload def WhichOneof(self, oneof_group: _WhichOneofArgType__name) -> _WhichOneofReturnType__name | None: ... diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py index d63efa77..718bb76c 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py @@ -28,7 +28,7 @@ from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1aorchestrator_actions.proto\x1a\x13orchestration.proto\x1a\x14history_events.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\xc4\x01\n\x12ScheduleTaskAction\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12 \n\x06router\x18\x04 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x12\x17\n\x0ftaskExecutionId\x18\x05 \x01(\tB\t\n\x07_router\"\xc6\x01\n\x19\x43reateChildWorkflowAction\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12 \n\x06router\x18\x05 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x42\t\n\x07_router\"\xc9\x01\n\x11\x43reateTimerAction\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12.\n\x0b\x63reateTimer\x18\x03 \x01(\x0b\x32\x17.TimerOriginCreateTimerH\x00\x12\x32\n\rexternalEvent\x18\x04 \x01(\x0b\x32\x19.TimerOriginExternalEventH\x00\x42\x08\n\x06originB\x07\n\x05_name\"p\n\x0fSendEventAction\x12#\n\x08instance\x18\x01 \x01(\x0b\x32\x11.WorkflowInstance\x12\x0c\n\x04name\x18\x02 \x01(\t\x12*\n\x04\x64\x61ta\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\xaa\x02\n\x16\x43ompleteWorkflowAction\x12,\n\x0eworkflowStatus\x18\x01 \x01(\x0e\x32\x14.OrchestrationStatus\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12-\n\x07\x64\x65tails\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x30\n\nnewVersion\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12&\n\x0f\x63\x61rryoverEvents\x18\x05 \x03(\x0b\x32\r.HistoryEvent\x12+\n\x0e\x66\x61ilureDetails\x18\x06 \x01(\x0b\x32\x13.TaskFailureDetails\"l\n\x17TerminateWorkflowAction\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06reason\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x0f\n\x07recurse\x18\x03 \x01(\x08\"#\n!WorkflowVersionNotAvailableAction\"\xd6\x03\n\x0eWorkflowAction\x12\n\n\x02id\x18\x01 \x01(\x05\x12+\n\x0cscheduleTask\x18\x02 \x01(\x0b\x32\x13.ScheduleTaskActionH\x00\x12\x39\n\x13\x63reateChildWorkflow\x18\x03 \x01(\x0b\x32\x1a.CreateChildWorkflowActionH\x00\x12)\n\x0b\x63reateTimer\x18\x04 \x01(\x0b\x32\x12.CreateTimerActionH\x00\x12%\n\tsendEvent\x18\x05 \x01(\x0b\x32\x10.SendEventActionH\x00\x12\x33\n\x10\x63ompleteWorkflow\x18\x06 \x01(\x0b\x32\x17.CompleteWorkflowActionH\x00\x12\x35\n\x11terminateWorkflow\x18\x07 \x01(\x0b\x32\x18.TerminateWorkflowActionH\x00\x12I\n\x1bworkflowVersionNotAvailable\x18\n \x01(\x0b\x32\".WorkflowVersionNotAvailableActionH\x00\x12 \n\x06router\x18\t \x01(\x0b\x32\x0b.TaskRouterH\x01\x88\x01\x01\x42\x14\n\x12workflowActionTypeB\t\n\x07_routerJ\x04\x08\x08\x10\tBV\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1aorchestrator_actions.proto\x1a\x13orchestration.proto\x1a\x14history_events.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\xc4\x01\n\x12ScheduleTaskAction\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12 \n\x06router\x18\x04 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x12\x17\n\x0ftaskExecutionId\x18\x05 \x01(\tB\t\n\x07_router\"\xc6\x01\n\x19\x43reateChildWorkflowAction\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12 \n\x06router\x18\x05 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x42\t\n\x07_router\"\xbb\x02\n\x11\x43reateTimerAction\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12.\n\x0b\x63reateTimer\x18\x03 \x01(\x0b\x32\x17.TimerOriginCreateTimerH\x00\x12\x32\n\rexternalEvent\x18\x04 \x01(\x0b\x32\x19.TimerOriginExternalEventH\x00\x12\x32\n\ractivityRetry\x18\x05 \x01(\x0b\x32\x19.TimerOriginActivityRetryH\x00\x12<\n\x12\x63hildWorkflowRetry\x18\x06 \x01(\x0b\x32\x1e.TimerOriginChildWorkflowRetryH\x00\x42\x08\n\x06originB\x07\n\x05_name\"p\n\x0fSendEventAction\x12#\n\x08instance\x18\x01 \x01(\x0b\x32\x11.WorkflowInstance\x12\x0c\n\x04name\x18\x02 \x01(\t\x12*\n\x04\x64\x61ta\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\xaa\x02\n\x16\x43ompleteWorkflowAction\x12,\n\x0eworkflowStatus\x18\x01 \x01(\x0e\x32\x14.OrchestrationStatus\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12-\n\x07\x64\x65tails\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x30\n\nnewVersion\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12&\n\x0f\x63\x61rryoverEvents\x18\x05 \x03(\x0b\x32\r.HistoryEvent\x12+\n\x0e\x66\x61ilureDetails\x18\x06 \x01(\x0b\x32\x13.TaskFailureDetails\"l\n\x17TerminateWorkflowAction\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06reason\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x0f\n\x07recurse\x18\x03 \x01(\x08\"#\n!WorkflowVersionNotAvailableAction\"\xd6\x03\n\x0eWorkflowAction\x12\n\n\x02id\x18\x01 \x01(\x05\x12+\n\x0cscheduleTask\x18\x02 \x01(\x0b\x32\x13.ScheduleTaskActionH\x00\x12\x39\n\x13\x63reateChildWorkflow\x18\x03 \x01(\x0b\x32\x1a.CreateChildWorkflowActionH\x00\x12)\n\x0b\x63reateTimer\x18\x04 \x01(\x0b\x32\x12.CreateTimerActionH\x00\x12%\n\tsendEvent\x18\x05 \x01(\x0b\x32\x10.SendEventActionH\x00\x12\x33\n\x10\x63ompleteWorkflow\x18\x06 \x01(\x0b\x32\x17.CompleteWorkflowActionH\x00\x12\x35\n\x11terminateWorkflow\x18\x07 \x01(\x0b\x32\x18.TerminateWorkflowActionH\x00\x12I\n\x1bworkflowVersionNotAvailable\x18\n \x01(\x0b\x32\".WorkflowVersionNotAvailableActionH\x00\x12 \n\x06router\x18\t \x01(\x0b\x32\x0b.TaskRouterH\x01\x88\x01\x01\x42\x14\n\x12workflowActionTypeB\t\n\x07_routerJ\x04\x08\x08\x10\tBV\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -41,15 +41,15 @@ _globals['_CREATECHILDWORKFLOWACTION']._serialized_start=338 _globals['_CREATECHILDWORKFLOWACTION']._serialized_end=536 _globals['_CREATETIMERACTION']._serialized_start=539 - _globals['_CREATETIMERACTION']._serialized_end=740 - _globals['_SENDEVENTACTION']._serialized_start=742 - _globals['_SENDEVENTACTION']._serialized_end=854 - _globals['_COMPLETEWORKFLOWACTION']._serialized_start=857 - _globals['_COMPLETEWORKFLOWACTION']._serialized_end=1155 - _globals['_TERMINATEWORKFLOWACTION']._serialized_start=1157 - _globals['_TERMINATEWORKFLOWACTION']._serialized_end=1265 - _globals['_WORKFLOWVERSIONNOTAVAILABLEACTION']._serialized_start=1267 - _globals['_WORKFLOWVERSIONNOTAVAILABLEACTION']._serialized_end=1302 - _globals['_WORKFLOWACTION']._serialized_start=1305 - _globals['_WORKFLOWACTION']._serialized_end=1775 + _globals['_CREATETIMERACTION']._serialized_end=854 + _globals['_SENDEVENTACTION']._serialized_start=856 + _globals['_SENDEVENTACTION']._serialized_end=968 + _globals['_COMPLETEWORKFLOWACTION']._serialized_start=971 + _globals['_COMPLETEWORKFLOWACTION']._serialized_end=1269 + _globals['_TERMINATEWORKFLOWACTION']._serialized_start=1271 + _globals['_TERMINATEWORKFLOWACTION']._serialized_end=1379 + _globals['_WORKFLOWVERSIONNOTAVAILABLEACTION']._serialized_start=1381 + _globals['_WORKFLOWVERSIONNOTAVAILABLEACTION']._serialized_end=1416 + _globals['_WORKFLOWACTION']._serialized_start=1419 + _globals['_WORKFLOWACTION']._serialized_end=1889 # @@protoc_insertion_point(module_scope) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi index 1b754f78..8df2644c 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi @@ -104,6 +104,8 @@ class CreateTimerAction(_message.Message): NAME_FIELD_NUMBER: _builtins.int CREATETIMER_FIELD_NUMBER: _builtins.int EXTERNALEVENT_FIELD_NUMBER: _builtins.int + ACTIVITYRETRY_FIELD_NUMBER: _builtins.int + CHILDWORKFLOWRETRY_FIELD_NUMBER: _builtins.int name: _builtins.str @_builtins.property def fireAt(self) -> _timestamp_pb2.Timestamp: ... @@ -111,6 +113,10 @@ class CreateTimerAction(_message.Message): def createTimer(self) -> _history_events_pb2.TimerOriginCreateTimer: ... @_builtins.property def externalEvent(self) -> _history_events_pb2.TimerOriginExternalEvent: ... + @_builtins.property + def activityRetry(self) -> _history_events_pb2.TimerOriginActivityRetry: ... + @_builtins.property + def childWorkflowRetry(self) -> _history_events_pb2.TimerOriginChildWorkflowRetry: ... def __init__( self, *, @@ -118,14 +124,16 @@ class CreateTimerAction(_message.Message): name: _builtins.str | None = ..., createTimer: _history_events_pb2.TimerOriginCreateTimer | None = ..., externalEvent: _history_events_pb2.TimerOriginExternalEvent | None = ..., + activityRetry: _history_events_pb2.TimerOriginActivityRetry | None = ..., + childWorkflowRetry: _history_events_pb2.TimerOriginChildWorkflowRetry | None = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin"] # noqa: Y015 + _HasFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "activityRetry", b"activityRetry", "childWorkflowRetry", b"childWorkflowRetry", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin"] # noqa: Y015 def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin"] # noqa: Y015 + _ClearFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "activityRetry", b"activityRetry", "childWorkflowRetry", b"childWorkflowRetry", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... _WhichOneofReturnType__name: _TypeAlias = _typing.Literal["name"] # noqa: Y015 _WhichOneofArgType__name: _TypeAlias = _typing.Literal["_name", b"_name"] # noqa: Y015 - _WhichOneofReturnType_origin: _TypeAlias = _typing.Literal["createTimer", "externalEvent"] # noqa: Y015 + _WhichOneofReturnType_origin: _TypeAlias = _typing.Literal["createTimer", "externalEvent", "activityRetry", "childWorkflowRetry"] # noqa: Y015 _WhichOneofArgType_origin: _TypeAlias = _typing.Literal["origin", b"origin"] # noqa: Y015 @_typing.overload def WhichOneof(self, oneof_group: _WhichOneofArgType__name) -> _WhichOneofReturnType__name | None: ... diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index d1a211dc..2f4c2e08 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -166,21 +166,25 @@ def call_sub_orchestrator( """ pass - # TODO: Add a timeout parameter, which allows the task to be canceled if the event is - # not received within the specified timeout. This requires support for task cancellation. @abstractmethod - def wait_for_external_event(self, name: str) -> Task: + def wait_for_external_event( + self, name: str, *, timeout: Optional[Union[datetime, timedelta]] = None + ) -> Task: """Wait asynchronously for an event to be raised with the name `name`. Parameters ---------- name : str The event name of the event that the task is waiting for. + timeout : datetime | timedelta | None + Optional deadline or duration after which a ``TimeoutError`` is raised + if the event has not been received. Returns ------- Task[TOutput] - A Durable Task that completes when the event is received. + A Durable Task that completes when the event is received or fails + with ``TimeoutError`` if the timeout fires first. """ pass @@ -494,6 +498,34 @@ def on_child_completed(self, task: Task): self._parent.on_child_completed(self) +class ExternalEventWithTimeoutTask(CompositeTask[T]): + """A task that waits for an external event with a timeout. + + Completes with the event data if the event arrives first, or raises + ``TimeoutError`` if the timer fires first. + """ + + def __init__(self, event_task: CompletableTask, timer_task: TimerTask): + self._event_task = event_task + self._timer_task = timer_task + super().__init__([event_task, timer_task]) + + def on_child_completed(self, completed_task: Task): + if self.is_complete: + return + if completed_task is self._event_task: + if completed_task.is_failed: + self._exception = completed_task.get_exception() + else: + self._result = completed_task.get_result() + self._is_complete = True + elif completed_task is self._timer_task: + self._exception = TimeoutError('The operation timed out waiting for an external event') + self._is_complete = True + if self._is_complete and self._parent is not None: + self._parent.on_child_completed(self) + + def when_all(tasks: list[Task[T]]) -> WhenAllTask[T]: """Returns a task that completes when all of the provided tasks complete or when one of the tasks fail.""" return WhenAllTask(tasks) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index 7de121a1..1fb4c7ce 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -1216,17 +1216,21 @@ def set_custom_status(self, custom_status: str) -> None: self._encoded_custom_status = custom_status def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: - return self.create_timer_internal(fire_at) + return self.create_timer_internal( + fire_at, + origin=pb.TimerOriginCreateTimer(), + ) def create_timer_internal( self, fire_at: Union[datetime, timedelta], retryable_task: Optional[task.RetryableTask] = None, + origin: Optional[ph.TimerOrigin] = None, ) -> task.Task: id = self.next_sequence_number() if isinstance(fire_at, timedelta): fire_at = self.current_utc_datetime + fire_at - action = ph.new_create_timer_action(id, fire_at) + action = ph.new_create_timer_action(id, fire_at, origin=origin) self._pending_actions[id] = action timer_task: task.TimerTask = task.TimerTask() @@ -1345,7 +1349,12 @@ def call_activity_function_helper( ) self._pending_tasks[id] = fn_task - def wait_for_external_event(self, name: str) -> task.Task: + def wait_for_external_event( + self, + name: str, + *, + timeout: Optional[Union[datetime, timedelta]] = None, + ) -> task.Task: # Check to see if this event has already been received, in which case we # can return it immediately. Otherwise, record out intent to receive an # event with the given name so that we can resume the generator when it @@ -1365,7 +1374,16 @@ def wait_for_external_event(self, name: str) -> task.Task: task_list = [] self._pending_events[event_name] = task_list task_list.append(external_event_task) - return external_event_task + + if external_event_task.is_complete: + return external_event_task + + fire_at = timeout if timeout is not None else datetime(9999, 12, 31, 23, 59, 59) + timer_task = self.create_timer_internal( + fire_at, + origin=pb.TimerOriginExternalEvent(name=name), + ) + return task.ExternalEventWithTimeoutTask(external_event_task, timer_task) def continue_as_new(self, new_input, *, save_events: bool = False) -> None: if self._is_complete: @@ -1642,7 +1660,13 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven ctx.resume() else: activity_task.increment_attempt_count() - ctx.create_timer_internal(next_delay, activity_task) + ctx.create_timer_internal( + next_delay, + activity_task, + origin=pb.TimerOriginActivityRetry( + taskExecutionId=activity_task._task_execution_id, + ), + ) elif isinstance(activity_task, task.CompletableTask): activity_task.fail( f'{ctx.instance_id}: Activity task #{task_id} failed: {event.taskFailed.failureDetails.errorMessage}', @@ -1717,7 +1741,13 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven ctx.resume() else: sub_orch_task.increment_attempt_count() - ctx.create_timer_internal(next_delay, sub_orch_task) + ctx.create_timer_internal( + next_delay, + sub_orch_task, + origin=pb.TimerOriginChildWorkflowRetry( + instanceId=sub_orch_task._instance_id, + ), + ) elif isinstance(sub_orch_task, task.CompletableTask): sub_orch_task.fail( f'Sub-orchestration task #{task_id} failed: {failedEvent.failureDetails.errorMessage}', diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index f0c98a70..1f9f6417 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -145,9 +145,14 @@ def wf(ctx: task.OrchestrationContext, inp: TInput): wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj, app_id=app_id ) - def wait_for_external_event(self, name: str) -> task.Task: + def wait_for_external_event( + self, + name: str, + *, + timeout: Optional[Union[datetime, timedelta]] = None, + ) -> task.Task: self._logger.debug(f'{self.instance_id}: Waiting for external event {name}') - return self.__obj.wait_for_external_event(name) + return self.__obj.wait_for_external_event(name, timeout=timeout) def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None: self._logger.debug(f'{self.instance_id}: Continuing as new') diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py index bfe5ce98..6f5610e2 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py @@ -162,18 +162,27 @@ def call_child_workflow( pass @abstractmethod - def wait_for_external_event(self, name: str) -> task.Task: + def wait_for_external_event( + self, + name: str, + *, + timeout: Optional[Union[datetime, timedelta]] = None, + ) -> task.Task: """Wait asynchronously for an event to be raised with the name `name`. Parameters ---------- name : str The event name of the event that the task is waiting for. + timeout : datetime | timedelta | None + Optional deadline or duration after which a ``TimeoutError`` is raised + if the event has not been received. Returns ------- Task[TOutput] - A Durable Task that completes when the event is received. + A Durable Task that completes when the event is received or fails + with ``TimeoutError`` if the timeout fires first. """ pass diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 0bc5c998..88f0cddc 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -912,16 +912,18 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID), ] - # Execute the orchestration until it is waiting for an external event. The result - # should be an empty list of actions because the orchestration didn't schedule any work. + # Execute the orchestration until it is waiting for an external event. A timer + # action is created for the external event wait (far-future deadline). executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 0 + assert len(actions) == 1 + assert actions[0].HasField('createTimer') # Now send an external event to the orchestration and execute it again. This time # the orchestration should complete. - old_events = new_events + far_future = datetime(9999, 12, 31, 23, 59, 59) + old_events = new_events + [helpers.new_timer_created_event(1, far_future)] new_events = [helpers.new_event_raised_event('my_event', encoded_input='42')] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) @@ -979,9 +981,12 @@ def orchestrator(ctx: task.OrchestrationContext, _): registry = worker._Registry() orchestrator_name = registry.add_orchestrator(orchestrator) + far_future = datetime(9999, 12, 31, 23, 59, 59) old_events = [ helpers.new_workflow_started_event(), helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID), + # The external event wait creates a timer even when no timeout is specified + helpers.new_timer_created_event(1, far_future), ] new_events = [ helpers.new_suspend_event(), @@ -1754,6 +1759,404 @@ def parent(ctx: task.OrchestrationContext, _): ) +def test_create_timer_sets_create_timer_origin(): + """Tests that create_timer sets TimerOriginCreateTimer on the CreateTimerAction.""" + + def delay_orchestrator(ctx: task.OrchestrationContext, _): + due_time = ctx.current_utc_datetime + timedelta(seconds=5) + yield ctx.create_timer(due_time) + return 'done' + + registry = worker._Registry() + name = registry.add_orchestrator(delay_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + new_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'createTimer' + + +def test_wait_for_external_event_timeout_sets_external_event_origin(): + """Tests that wait_for_external_event with timeout creates a timer with + TimerOriginExternalEvent origin containing the event name.""" + + def timeout_orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=30)) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(timeout_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + new_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + # The only action should be the timer (the external event wait doesn't produce an action) + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'externalEvent' + assert timer_action.externalEvent.name == 'myEvent' + + +def test_wait_for_external_event_timeout_fires_raises_timeout_error(): + """Tests that when the timeout fires before the event arrives, the task + raises a TimeoutError.""" + + def timeout_orchestrator(ctx: task.OrchestrationContext, _): + try: + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=5)) + return f'got: {result}' + except TimeoutError: + return 'timed out' + + registry = worker._Registry() + name = registry.add_orchestrator(timeout_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + fire_at = start_time + timedelta(seconds=5) + + # First execution: creates the timer + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event(1, fire_at), + ] + # Timer fires before event arrives + new_events = [ + helpers.new_timer_fired_event(1, fire_at), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('timed out') + + +def test_wait_for_external_event_with_timeout_event_arrives_first(): + """Tests that when the event arrives before the timeout, the task completes + with the event data.""" + + def timeout_orchestrator(ctx: task.OrchestrationContext, _): + try: + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=30)) + return f'got: {result}' + except TimeoutError: + return 'timed out' + + registry = worker._Registry() + name = registry.add_orchestrator(timeout_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + + # First execution: creates the timer (id=1) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event(1, start_time + timedelta(seconds=30)), + ] + # Event arrives before the timer fires + new_events = [ + helpers.new_event_raised_event('myEvent', json.dumps('hello')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('got: hello') + + +def test_wait_for_external_event_no_timeout_creates_far_future_timer(): + """Tests that wait_for_external_event without timeout still creates a timer + with a far-future deadline (year 9999) and TimerOriginExternalEvent origin.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event('myEvent') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + new_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + # A timer is always created for external event waits + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'externalEvent' + assert timer_action.externalEvent.name == 'myEvent' + # The timer should fire far in the future (year 9999) + assert timer_action.fireAt.ToDatetime().year == 9999 + + +def test_activity_retry_timer_sets_activity_retry_origin(): + """Tests that retry timers for failed activities set TimerOriginActivityRetry + with the correct taskExecutionId.""" + + def dummy_activity(ctx, _): + raise ValueError('boom') + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_activity( + dummy_activity, + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + # Attempt 1: scheduleTask(id=1) fails + old_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + ] + new_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_task_failed_event(1, ValueError('boom')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # The retry timer should have activityRetry origin + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'activityRetry' + assert timer_action.activityRetry.taskExecutionId != '' + + +def test_activity_retry_task_execution_id_stable_across_retries(): + """Tests that the taskExecutionId in TimerOriginActivityRetry is stable + across multiple retry attempts of the same logical activity call.""" + + def dummy_activity(ctx, _): + raise ValueError('boom') + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_activity( + dummy_activity, + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=4, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + # Attempt 1: scheduleTask(id=1) fails -> retry timer(id=2) + old_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + ] + new_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_task_failed_event(1, ValueError('boom')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + retry_timer_1 = actions[0].createTimer + first_task_execution_id = retry_timer_1.activityRetry.taskExecutionId + assert first_task_execution_id != '' + + # Timer fires -> scheduleTask(id=3), then fails -> retry timer(id=4) + old_events = old_events + new_events + current_timestamp = current_timestamp + timedelta(seconds=1) + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_timer_created_event(2, current_timestamp), + helpers.new_timer_fired_event(2, current_timestamp), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('scheduleTask') + + # Attempt 2 fails + old_events = old_events + new_events + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_task_scheduled_event(3, task.get_name(dummy_activity)), + helpers.new_task_failed_event(3, ValueError('boom')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + retry_timer_2 = actions[0].createTimer + second_task_execution_id = retry_timer_2.activityRetry.taskExecutionId + + # Both retry timers must carry the SAME taskExecutionId + assert second_task_execution_id == first_task_execution_id + + +def test_child_workflow_retry_timer_sets_child_workflow_retry_origin(): + """Tests that retry timers for failed child workflows set + TimerOriginChildWorkflowRetry with the correct instanceId.""" + + def child_orchestrator(ctx: task.OrchestrationContext, _): + raise ValueError('child failed') + + def parent_orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_sub_orchestrator( + 'child_orchestrator', + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(parent_orchestrator) + registry.add_orchestrator(child_orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + # First child created with id=1 -> instance_id = "abc123:0001" + expected_first_child_id = f'{TEST_INSTANCE_ID}:0001' + old_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_child_workflow_created_event( + 1, 'child_orchestrator', expected_first_child_id + ), + ] + new_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_child_workflow_failed_event(1, ValueError('child failed')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'childWorkflowRetry' + assert timer_action.childWorkflowRetry.instanceId == expected_first_child_id + + +def test_child_workflow_retry_instance_id_always_points_to_first_child(): + """Tests that the instanceId in TimerOriginChildWorkflowRetry always + references the first child workflow's instance ID, even across multiple retries.""" + + def child_orchestrator(ctx: task.OrchestrationContext, _): + raise ValueError('child failed') + + def parent_orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_sub_orchestrator( + 'child_orchestrator', + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=4, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(parent_orchestrator) + registry.add_orchestrator(child_orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + # First child: sub-orch(id=1) -> instance_id = "abc123:0001" + expected_first_child_id = f'{TEST_INSTANCE_ID}:0001' + old_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_child_workflow_created_event( + 1, 'child_orchestrator', expected_first_child_id + ), + ] + new_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_child_workflow_failed_event(1, ValueError('child failed')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + retry_timer_1 = actions[0].createTimer + assert retry_timer_1.childWorkflowRetry.instanceId == expected_first_child_id + + # Timer fires -> new child sub-orch(id=3) with a DIFFERENT instance_id + old_events = old_events + new_events + current_timestamp = current_timestamp + timedelta(seconds=1) + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_timer_created_event(2, current_timestamp), + helpers.new_timer_fired_event(2, current_timestamp), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createChildWorkflow') + + # Second child fails + old_events = old_events + new_events + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_child_workflow_created_event( + 3, 'child_orchestrator', expected_first_child_id + ), + helpers.new_child_workflow_failed_event(3, ValueError('child failed')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + retry_timer_2 = actions[0].createTimer + + # Retry timer 2 must ALSO point to the first child's instance ID + assert retry_timer_2.childWorkflowRetry.instanceId == expected_first_child_id + + def get_and_validate_single_complete_workflow_action( actions: list[pb.WorkflowAction], ) -> pb.CompleteWorkflowAction: From b6d99bd795a99967baf869d12027ba2719893517 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Fri, 10 Apr 2026 15:18:24 +0200 Subject: [PATCH 02/13] ruff Signed-off-by: Albert Callarisa --- .../tests/durabletask/test_orchestration_executor.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 88f0cddc..1d0838d9 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -2060,9 +2060,7 @@ def parent_orchestrator(ctx: task.OrchestrationContext, _): old_events = [ helpers.new_workflow_started_event(timestamp=current_timestamp), helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), - helpers.new_child_workflow_created_event( - 1, 'child_orchestrator', expected_first_child_id - ), + helpers.new_child_workflow_created_event(1, 'child_orchestrator', expected_first_child_id), ] new_events = [ helpers.new_workflow_started_event(timestamp=current_timestamp), @@ -2107,9 +2105,7 @@ def parent_orchestrator(ctx: task.OrchestrationContext, _): old_events = [ helpers.new_workflow_started_event(timestamp=current_timestamp), helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), - helpers.new_child_workflow_created_event( - 1, 'child_orchestrator', expected_first_child_id - ), + helpers.new_child_workflow_created_event(1, 'child_orchestrator', expected_first_child_id), ] new_events = [ helpers.new_workflow_started_event(timestamp=current_timestamp), @@ -2141,9 +2137,7 @@ def parent_orchestrator(ctx: task.OrchestrationContext, _): old_events = old_events + new_events new_events = [ helpers.new_workflow_started_event(current_timestamp), - helpers.new_child_workflow_created_event( - 3, 'child_orchestrator', expected_first_child_id - ), + helpers.new_child_workflow_created_event(3, 'child_orchestrator', expected_first_child_id), helpers.new_child_workflow_failed_event(3, ValueError('child failed')), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) From b2c7d874a2228747f6c7058ec575cd2578ecf354 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Mon, 13 Apr 2026 17:03:30 +0200 Subject: [PATCH 03/13] Backwards compatible external event timer Signed-off-by: Albert Callarisa --- .../workflow/_durabletask/internal/helpers.py | 68 ++++- .../dapr/ext/workflow/_durabletask/task.py | 4 +- .../dapr/ext/workflow/_durabletask/worker.py | 102 ++++++- .../test_orchestration_executor.py | 264 ++++++++++++++++-- 4 files changed, 407 insertions(+), 31 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py index 0f63cde9..c296d9ec 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py @@ -30,6 +30,47 @@ pb.TimerOriginChildWorkflowRetry: 'childWorkflowRetry', } +# Sentinel fireAt used for "optional" TimerOriginExternalEvent timers that back an +# indefinite wait_for_external_event. The sentinel is 9999-12-31T23:59:59.999999999Z +# (nanosecond precision — cannot be represented with Python's datetime, which only +# supports microseconds, so we build the Timestamp directly). +OPTIONAL_TIMER_FIRE_AT: timestamp_pb2.Timestamp = timestamp_pb2.Timestamp( + seconds=253402300799, nanos=999999999 +) + + +def is_optional_timer_action(action: pb.WorkflowAction) -> bool: + """Returns True if the action is an optional TimerOriginExternalEvent timer + with the sentinel fireAt — i.e. created by an indefinite wait_for_external_event. + + Pre-patch histories (from prior SDK versions that didn't schedule a timer for + indefinite waits) won't carry a matching TimerCreatedEvent; the replay logic + uses this check to drop the optional action and shift sequence ids. + """ + if not action.HasField('createTimer'): + return False + timer = action.createTimer + if timer.WhichOneof('origin') != 'externalEvent': + return False + return ( + timer.fireAt.seconds == OPTIONAL_TIMER_FIRE_AT.seconds + and timer.fireAt.nanos == OPTIONAL_TIMER_FIRE_AT.nanos + ) + + +def is_optional_timer_event(event: pb.HistoryEvent) -> bool: + """Returns True if a TimerCreatedEvent is the optional TimerOriginExternalEvent + sentinel timer.""" + if not event.HasField('timerCreated'): + return False + timer = event.timerCreated + if timer.WhichOneof('origin') != 'externalEvent': + return False + return ( + timer.fireAt.seconds == OPTIONAL_TIMER_FIRE_AT.seconds + and timer.fireAt.nanos == OPTIONAL_TIMER_FIRE_AT.nanos + ) + # TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere @@ -54,13 +95,21 @@ def new_execution_started_event( ) -def new_timer_created_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent: - ts = timestamp_pb2.Timestamp() - ts.FromDatetime(fire_at) +def new_timer_created_event( + timer_id: int, + fire_at: Union[datetime, timestamp_pb2.Timestamp], + origin: Optional[TimerOrigin] = None, +) -> pb.HistoryEvent: + if isinstance(fire_at, timestamp_pb2.Timestamp): + ts = fire_at + else: + ts = timestamp_pb2.Timestamp() + ts.FromDatetime(fire_at) + origin_kwargs = {_ORIGIN_FIELD[type(origin)]: origin} if origin is not None else {} return pb.HistoryEvent( eventId=timer_id, timestamp=timestamp_pb2.Timestamp(), - timerCreated=pb.TimerCreatedEvent(fireAt=ts), + timerCreated=pb.TimerCreatedEvent(fireAt=ts, **origin_kwargs), ) @@ -217,10 +266,15 @@ def new_workflow_version_not_available_action( def new_create_timer_action( - id: int, fire_at: datetime, origin: Optional[TimerOrigin] = None + id: int, + fire_at: Union[datetime, timestamp_pb2.Timestamp], + origin: Optional[TimerOrigin] = None, ) -> pb.WorkflowAction: - timestamp = timestamp_pb2.Timestamp() - timestamp.FromDatetime(fire_at) + if isinstance(fire_at, timestamp_pb2.Timestamp): + timestamp = fire_at + else: + timestamp = timestamp_pb2.Timestamp() + timestamp.FromDatetime(fire_at) origin_kwargs = {_ORIGIN_FIELD[type(origin)]: origin} if origin is not None else {} return pb.WorkflowAction( id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp, **origin_kwargs) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index 2f4c2e08..d09f29dc 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -288,7 +288,7 @@ class Task(ABC, Generic[T]): """Abstract base class for asynchronous tasks in a durable orchestration.""" _result: T - _exception: Optional[TaskFailedError] + _exception: Optional[Exception] _parent: Optional[CompositeTask[T]] def __init__(self) -> None: @@ -315,7 +315,7 @@ def get_result(self) -> T: raise self._exception return self._result - def get_exception(self) -> TaskFailedError: + def get_exception(self) -> Exception: """Returns the exception that caused the task to fail.""" if self._exception is None: raise ValueError('The task has not failed.') diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index 1fb4c7ce..f0b385da 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -30,7 +30,7 @@ import grpc from dapr.ext.workflow._durabletask import deterministic, task from dapr.ext.workflow._durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl -from google.protobuf import empty_pb2 +from google.protobuf import empty_pb2, timestamp_pb2 TInput = TypeVar('TInput') TOutput = TypeVar('TOutput') @@ -1068,6 +1068,11 @@ def run(self, generator: Generator[task.Task, Any, Any]): task = next(generator) # this starts the generator # TODO: Check if the task is null? self._previous_task = task + # If the first yielded task is already complete (e.g. wait_for_external_event + # with timeout=0 returns an immediately-canceled task), drive the generator + # forward now so the orchestrator doesn't get stuck waiting for an event. + if self._previous_task is not None and self._previous_task.is_complete: + self.resume() def resume(self): if self._generator is None: @@ -1183,6 +1188,40 @@ def next_sequence_number(self) -> int: self._sequence_number += 1 return self._sequence_number + def _drop_optional_pending_at(self, action_id: int) -> bool: + """Drop the pending optional timer action at ``action_id`` and shift every + pending action and pending task with id greater than ``action_id`` down by + one, also decrementing the sequence counter. + + Returns True if the action at ``action_id`` was an optional timer and was + dropped; False otherwise (caller should fall through to the normal + non-determinism error path). + + This preserves sequence-id determinism when replaying a pre-patch history + where an indefinite wait_for_external_event did not reserve a sequence + number for its optional timer. + """ + action = self._pending_actions.get(action_id) + if action is None or not ph.is_optional_timer_action(action): + return False + + del self._pending_actions[action_id] + self._pending_tasks.pop(action_id, None) + + # Shift every id > action_id down by one. + higher_action_ids = sorted(k for k in self._pending_actions if k > action_id) + for old_id in higher_action_ids: + a = self._pending_actions.pop(old_id) + a.id = old_id - 1 + self._pending_actions[old_id - 1] = a + higher_task_ids = sorted(k for k in self._pending_tasks if k > action_id) + for old_id in higher_task_ids: + t = self._pending_tasks.pop(old_id) + self._pending_tasks[old_id - 1] = t + + self._sequence_number -= 1 + return True + @property def app_id(self) -> str: return self._app_id @@ -1223,10 +1262,10 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: def create_timer_internal( self, - fire_at: Union[datetime, timedelta], + fire_at: Union[datetime, timedelta, timestamp_pb2.Timestamp], retryable_task: Optional[task.RetryableTask] = None, origin: Optional[ph.TimerOrigin] = None, - ) -> task.Task: + ) -> task.TimerTask: id = self.next_sequence_number() if isinstance(fire_at, timedelta): fire_at = self.current_utc_datetime + fire_at @@ -1378,7 +1417,36 @@ def wait_for_external_event( if external_event_task.is_complete: return external_event_task - fire_at = timeout if timeout is not None else datetime(9999, 12, 31, 23, 59, 59) + # Three timeout shapes: + # - timedelta(0): immediately-canceled task (no timer) + # - positive: normal TimerOriginExternalEvent timer + # - None or negative: indefinite wait — an OPTIONAL timer is scheduled + # with a sentinel fireAt. See ph.OPTIONAL_TIMER_FIRE_AT + # and the replay-shift logic for how older histories + # (which didn't schedule the timer) are tolerated. + if isinstance(timeout, timedelta) and timeout == timedelta(0): + # Remove the task we just registered in _pending_events so the event + # doesn't try to complete it later. + pending = self._pending_events.get(event_name) + if pending and external_event_task in pending: + pending.remove(external_event_task) + if not pending: + del self._pending_events[event_name] + external_event_task._is_complete = True + external_event_task._exception = TimeoutError( + 'The operation timed out waiting for an external event' + ) + return external_event_task + + is_indefinite = timeout is None or ( + isinstance(timeout, timedelta) and timeout < timedelta(0) + ) + fire_at: Union[datetime, timestamp_pb2.Timestamp] + if is_indefinite: + fire_at = ph.OPTIONAL_TIMER_FIRE_AT + else: + fire_at = timeout # type: ignore[assignment] + timer_task = self.create_timer_internal( fire_at, origin=pb.TimerOriginExternalEvent(name=name), @@ -1561,6 +1629,18 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven # This history event confirms that the timer was successfully scheduled. # Remove the timerCreated event from the pending action list so we don't schedule it again. timer_id = event.eventId + # Asymmetric case: pending is an optional timer but the incoming + # TimerCreated is a different (non-optional) timer — e.g., a user + # CreateTimer emitted by pre-patch code right after an indefinite + # wait_for_external_event. Drop the optional and shift so the + # real timer matches at the same id. + pending = ctx._pending_actions.get(timer_id) + if ( + pending is not None + and ph.is_optional_timer_action(pending) + and not ph.is_optional_timer_event(event) + ): + ctx._drop_optional_pending_at(timer_id) action = ctx._pending_actions.pop(timer_id, None) if not action: raise _get_non_determinism_error(timer_id, task.get_name(ctx.create_timer)) @@ -1598,6 +1678,14 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven # This history event confirms that the activity execution was successfully scheduled. # Remove the taskScheduled event from the pending action list so we don't schedule it again. task_id = event.eventId + # If the pending action at this id is an optional timer from an + # indefinite wait_for_external_event that wasn't present in the + # pre-patch history, drop it and shift so this schedule matches. + if ( + task_id in ctx._pending_actions + and ph.is_optional_timer_action(ctx._pending_actions[task_id]) + ): + ctx._drop_optional_pending_at(task_id) action = ctx._pending_actions.pop(task_id, None) activity_task = ctx._pending_tasks.get(task_id, None) if not action: @@ -1679,6 +1767,12 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven # This history event confirms that the sub-orchestration execution was successfully scheduled. # Remove the childWorkflowInstanceCreated event from the pending action list so we don't schedule it again. task_id = event.eventId + # If the pending action at this id is an optional timer, drop+shift. + if ( + task_id in ctx._pending_actions + and ph.is_optional_timer_action(ctx._pending_actions[task_id]) + ): + ctx._drop_optional_pending_at(task_id) action = ctx._pending_actions.pop(task_id, None) if not action: raise _get_non_determinism_error( diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 1d0838d9..77c11825 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -912,18 +912,23 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID), ] - # Execute the orchestration until it is waiting for an external event. A timer - # action is created for the external event wait (far-future deadline). + # Execute the orchestration until it is waiting for an external event. An + # optional TimerOriginExternalEvent timer is scheduled (sentinel fireAt). executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions assert len(actions) == 1 assert actions[0].HasField('createTimer') - # Now send an external event to the orchestration and execute it again. This time - # the orchestration should complete. - far_future = datetime(9999, 12, 31, 23, 59, 59) - old_events = new_events + [helpers.new_timer_created_event(1, far_future)] + # Post-patch replay: history contains the matching optional TimerCreated. The + # orchestration completes normally on event arrival. + old_events = new_events + [ + helpers.new_timer_created_event( + 1, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='my_event'), + ) + ] new_events = [helpers.new_event_raised_event('my_event', encoded_input='42')] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) @@ -981,12 +986,16 @@ def orchestrator(ctx: task.OrchestrationContext, _): registry = worker._Registry() orchestrator_name = registry.add_orchestrator(orchestrator) - far_future = datetime(9999, 12, 31, 23, 59, 59) old_events = [ helpers.new_workflow_started_event(), helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID), - # The external event wait creates a timer even when no timeout is specified - helpers.new_timer_created_event(1, far_future), + # The external event wait creates an optional TimerOriginExternalEvent timer + # with the sentinel fireAt. Post-patch history contains the matching event. + helpers.new_timer_created_event( + 1, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='my_event'), + ), ] new_events = [ helpers.new_suspend_event(), @@ -1884,9 +1893,9 @@ def timeout_orchestrator(ctx: task.OrchestrationContext, _): assert complete_action.result.value == json.dumps('got: hello') -def test_wait_for_external_event_no_timeout_creates_far_future_timer(): - """Tests that wait_for_external_event without timeout still creates a timer - with a far-future deadline (year 9999) and TimerOriginExternalEvent origin.""" +def test_wait_for_external_event_indefinite_emits_optional_timer(): + """WaitForExternalEvent with no timeout (or a negative timeout) emits an + optional timer whose fireAt is the exact sentinel 9999-12-31T23:59:59.999999999Z.""" def orchestrator(ctx: task.OrchestrationContext, _): result = yield ctx.wait_for_external_event('myEvent') @@ -1904,14 +1913,228 @@ def orchestrator(ctx: task.OrchestrationContext, _): result = executor.execute(TEST_INSTANCE_ID, [], new_events) actions = result.actions - # A timer is always created for external event waits assert len(actions) == 1 assert actions[0].HasField('createTimer') timer_action = actions[0].createTimer assert timer_action.WhichOneof('origin') == 'externalEvent' assert timer_action.externalEvent.name == 'myEvent' - # The timer should fire far in the future (year 9999) - assert timer_action.fireAt.ToDatetime().year == 9999 + # Exact sentinel match — bit-for-bit, including nanoseconds. + assert timer_action.fireAt.seconds == helpers.OPTIONAL_TIMER_FIRE_AT.seconds + assert timer_action.fireAt.nanos == helpers.OPTIONAL_TIMER_FIRE_AT.nanos + assert helpers.is_optional_timer_action(actions[0]) + + +def test_wait_for_external_event_zero_timeout_emits_no_timer(): + """WaitForExternalEvent with timeout=0 emits no timer and the returned task is + immediately canceled (fails with TimeoutError).""" + + def orchestrator(ctx: task.OrchestrationContext, _): + try: + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(0)) + return f'got: {result}' + except TimeoutError: + return 'canceled' + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + new_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + # Only the completion action is emitted — no CreateTimerAction. + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('canceled') + + +def test_post_patch_replay_optional_timer_matches_history(): + """A post-patch history containing the optional TimerCreated event replays + through the normal match path without shifting.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event('myEvent') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event( + 1, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='myEvent'), + ), + ] + new_events = [helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('ok'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('ok') + + +def test_pre_patch_replay_indefinite_wait_then_activity(): + """A pre-patch history has the activity scheduled at id=1 (no reserved id for + the indefinite wait). The replay must drop the optional timer, shift the + activity down to id=1, and complete cleanly.""" + + def dummy_activity(ctx, _): + return 'activity result' + + def orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.wait_for_external_event('myEvent') + result = yield ctx.call_activity(dummy_activity) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + registry.add_activity(dummy_activity) + + # Pre-patch history: no timerCreated for the wait, activity scheduled at id=1 + # (which would have been id=2 under post-patch numbering). + start_time = datetime(2020, 1, 1, 12, 0, 0) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('go')), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + ] + new_events = [helpers.new_task_completed_event(1, json.dumps('activity result'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # A single completion action — no phantom createTimer leaks. + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('activity result') + + +def test_pre_patch_replay_indefinite_wait_then_child_workflow(): + """A pre-patch history with a child workflow scheduled after an indefinite + wait — the shift logic must work for childWorkflowInstanceCreated too.""" + + def child_orchestrator(ctx: task.OrchestrationContext, _): + return 'child result' + + def parent_orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.wait_for_external_event('myEvent') + result = yield ctx.call_sub_orchestrator('child_orchestrator') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(parent_orchestrator) + registry.add_orchestrator(child_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + # Pre-patch history: child workflow scheduled at id=1 (no reserved id for wait). + child_instance_id = f'{TEST_INSTANCE_ID}:0001' + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('go')), + helpers.new_child_workflow_created_event(1, 'child_orchestrator', child_instance_id), + ] + new_events = [ + helpers.new_child_workflow_completed_event(1, json.dumps('child result')) + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('child result') + + +def test_pre_patch_replay_indefinite_wait_then_user_create_timer(): + """A pre-patch history has a user CreateTimer right after an indefinite wait. + Both the pending action and the incoming event are CreateTimer — the SDK must + distinguish optional (externalEvent + sentinel) from non-optional and shift.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.wait_for_external_event('myEvent') + yield ctx.create_timer(timedelta(seconds=5)) + return 'done' + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + # Pre-patch history: a non-optional (user) TimerCreated at id=1. The + # post-patch code would have emitted the optional timer at id=1 and the user + # timer at id=2; the shift must drop the optional and match the user timer. + user_fire_at = start_time + timedelta(seconds=5) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('go')), + helpers.new_timer_created_event( + 1, + user_fire_at, + origin=pb.TimerOriginCreateTimer(), + ), + ] + new_events = [helpers.new_timer_fired_event(1, user_fire_at)] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('done') + + +def test_pre_patch_replay_two_indefinite_waits(): + """Two indefinite waits in sequence. Shifts must compose across multiple + optional timers.""" + + def dummy_activity(ctx, _): + return 'act result' + + def orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.wait_for_external_event('A') + yield ctx.call_activity(dummy_activity) + yield ctx.wait_for_external_event('B') + result = yield ctx.call_activity(dummy_activity) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + registry.add_activity(dummy_activity) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + # Pre-patch numbering: first activity id=1, second activity id=2 (both waits + # consumed no sequence numbers). + activity_name = task.get_name(dummy_activity) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_event_raised_event('A', encoded_input=json.dumps('a')), + helpers.new_task_scheduled_event(1, activity_name), + helpers.new_task_completed_event(1, json.dumps('act result')), + helpers.new_event_raised_event('B', encoded_input=json.dumps('b')), + helpers.new_task_scheduled_event(2, activity_name), + ] + new_events = [helpers.new_task_completed_event(2, json.dumps('act result'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('act result') def test_activity_retry_timer_sets_activity_retry_origin(): @@ -2133,11 +2356,14 @@ def parent_orchestrator(ctx: task.OrchestrationContext, _): assert len(actions) == 1 assert actions[0].HasField('createChildWorkflow') - # Second child fails + # Second child fails. Simulate the second child having a DIFFERENT instance ID + # (e.g. if a backend assigned a new ID on retry) to prove the timer origin still + # references the FIRST child's ID regardless. + expected_second_child_id = f'{TEST_INSTANCE_ID}:0003' old_events = old_events + new_events new_events = [ helpers.new_workflow_started_event(current_timestamp), - helpers.new_child_workflow_created_event(3, 'child_orchestrator', expected_first_child_id), + helpers.new_child_workflow_created_event(3, 'child_orchestrator', expected_second_child_id), helpers.new_child_workflow_failed_event(3, ValueError('child failed')), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) @@ -2147,8 +2373,10 @@ def parent_orchestrator(ctx: task.OrchestrationContext, _): assert actions[0].HasField('createTimer') retry_timer_2 = actions[0].createTimer - # Retry timer 2 must ALSO point to the first child's instance ID + # Retry timer 2 must ALSO point to the FIRST child's instance ID, + # NOT the second child's ID. assert retry_timer_2.childWorkflowRetry.instanceId == expected_first_child_id + assert retry_timer_2.childWorkflowRetry.instanceId != expected_second_child_id def get_and_validate_single_complete_workflow_action( From 9485df641af1e4df92b316136d805eff4b12157c Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Mon, 13 Apr 2026 17:10:41 +0200 Subject: [PATCH 04/13] ruff Signed-off-by: Albert Callarisa --- .../dapr/ext/workflow/_durabletask/internal/helpers.py | 1 + .../dapr/ext/workflow/_durabletask/worker.py | 10 ++++------ .../tests/durabletask/test_orchestration_executor.py | 4 +--- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py index c296d9ec..d8dc888b 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py @@ -71,6 +71,7 @@ def is_optional_timer_event(event: pb.HistoryEvent) -> bool: and timer.fireAt.nanos == OPTIONAL_TIMER_FIRE_AT.nanos ) + # TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index f0b385da..cf6826a5 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -1681,9 +1681,8 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven # If the pending action at this id is an optional timer from an # indefinite wait_for_external_event that wasn't present in the # pre-patch history, drop it and shift so this schedule matches. - if ( - task_id in ctx._pending_actions - and ph.is_optional_timer_action(ctx._pending_actions[task_id]) + if task_id in ctx._pending_actions and ph.is_optional_timer_action( + ctx._pending_actions[task_id] ): ctx._drop_optional_pending_at(task_id) action = ctx._pending_actions.pop(task_id, None) @@ -1768,9 +1767,8 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven # Remove the childWorkflowInstanceCreated event from the pending action list so we don't schedule it again. task_id = event.eventId # If the pending action at this id is an optional timer, drop+shift. - if ( - task_id in ctx._pending_actions - and ph.is_optional_timer_action(ctx._pending_actions[task_id]) + if task_id in ctx._pending_actions and ph.is_optional_timer_action( + ctx._pending_actions[task_id] ): ctx._drop_optional_pending_at(task_id) action = ctx._pending_actions.pop(task_id, None) diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 77c11825..0b14103e 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -2046,9 +2046,7 @@ def parent_orchestrator(ctx: task.OrchestrationContext, _): helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('go')), helpers.new_child_workflow_created_event(1, 'child_orchestrator', child_instance_id), ] - new_events = [ - helpers.new_child_workflow_completed_event(1, json.dumps('child result')) - ] + new_events = [helpers.new_child_workflow_completed_event(1, json.dumps('child result'))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions From 03558b8ac4c8a2d56559d14e55b3482de853c588 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Mon, 13 Apr 2026 17:12:44 +0200 Subject: [PATCH 05/13] Addressed review comments Signed-off-by: Albert Callarisa --- .../dapr/ext/workflow/_durabletask/task.py | 32 +++++++++++++++++-- .../dapr/ext/workflow/_durabletask/worker.py | 14 +++++--- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index d09f29dc..a2895399 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -401,6 +401,19 @@ def fail(self, message: str, details: pb.TaskFailureDetails): if self._parent is not None: self._parent.on_child_completed(self) + def cancel(self, exc: Exception) -> None: + """Mark this task as completed with the given exception. + + Used for non-runtime failures such as a zero-timeout + wait_for_external_event being canceled before any history event arrives. + """ + if self._is_complete: + raise ValueError('The task has already completed.') + self._exception = exc + self._is_complete = True + if self._parent is not None: + self._parent.on_child_completed(self) + class RetryableTask(CompletableTask[T]): """A task that can be retried according to a retry policy.""" @@ -505,9 +518,17 @@ class ExternalEventWithTimeoutTask(CompositeTask[T]): ``TimeoutError`` if the timer fires first. """ - def __init__(self, event_task: CompletableTask, timer_task: TimerTask): + def __init__( + self, + event_task: CompletableTask, + timer_task: TimerTask, + event_name: str, + timeout: Optional[Union[datetime, timedelta]] = None, + ): self._event_task = event_task self._timer_task = timer_task + self._event_name = event_name + self._timeout = timeout super().__init__([event_task, timer_task]) def on_child_completed(self, completed_task: Task): @@ -520,7 +541,14 @@ def on_child_completed(self, completed_task: Task): self._result = completed_task.get_result() self._is_complete = True elif completed_task is self._timer_task: - self._exception = TimeoutError('The operation timed out waiting for an external event') + if self._timeout is not None: + msg = ( + f'Timed out after {self._timeout!r} waiting for ' + f'external event {self._event_name!r}' + ) + else: + msg = f'Timed out waiting for external event {self._event_name!r}' + self._exception = TimeoutError(msg) self._is_complete = True if self._is_complete and self._parent is not None: self._parent.on_child_completed(self) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index cf6826a5..a4899586 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -1432,9 +1432,10 @@ def wait_for_external_event( pending.remove(external_event_task) if not pending: del self._pending_events[event_name] - external_event_task._is_complete = True - external_event_task._exception = TimeoutError( - 'The operation timed out waiting for an external event' + external_event_task.cancel( + TimeoutError( + f'Wait for external event {name!r} canceled immediately due to zero timeout' + ) ) return external_event_task @@ -1451,7 +1452,12 @@ def wait_for_external_event( fire_at, origin=pb.TimerOriginExternalEvent(name=name), ) - return task.ExternalEventWithTimeoutTask(external_event_task, timer_task) + return task.ExternalEventWithTimeoutTask( + external_event_task, + timer_task, + event_name=name, + timeout=None if is_indefinite else timeout, + ) def continue_as_new(self, new_input, *, save_events: bool = False) -> None: if self._is_complete: From 6c4e337b7b2deb44d8a27b27fe733d8b6d8dc6ed Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Mon, 13 Apr 2026 17:19:22 +0200 Subject: [PATCH 06/13] Fix ci Signed-off-by: Albert Callarisa --- dapr/clients/grpc/_helpers.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dapr/clients/grpc/_helpers.py b/dapr/clients/grpc/_helpers.py index 8eb9a1e9..1ab1f9e6 100644 --- a/dapr/clients/grpc/_helpers.py +++ b/dapr/clients/grpc/_helpers.py @@ -14,10 +14,11 @@ """ from enum import Enum -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union, cast from google.protobuf import json_format from google.protobuf.any_pb2 import Any as GrpcAny +from google.protobuf.descriptor import Descriptor from google.protobuf.message import Message as GrpcMessage from google.protobuf.struct_pb2 import Struct from google.protobuf.wrappers_pb2 import ( @@ -63,7 +64,10 @@ def unpack(data: GrpcAny, message: GrpcMessage) -> None: """ if not isinstance(message, GrpcMessage): raise ValueError('output message is not protocol buffer message object') - if not data.Is(message.DESCRIPTOR): + # cast: newer types-grpcio stubs declare ``message.DESCRIPTOR`` as a union of + # ``Descriptor`` and the C-extension ``_upb._message.Descriptor``. Both are + # accepted at runtime by ``Any.Is``; the cast narrows the type for mypy. + if not data.Is(cast(Descriptor, message.DESCRIPTOR)): raise ValueError(f'invalid type. serialized message type: {data.type_url}') data.Unpack(message) From ad30808e7ea9cdbc7eb0dc86935acf894748fcbe Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Mon, 13 Apr 2026 17:21:33 +0200 Subject: [PATCH 07/13] Addressed review comments Signed-off-by: Albert Callarisa --- .../workflow/_durabletask/internal/helpers.py | 20 ++++++++----- .../dapr/ext/workflow/_durabletask/task.py | 12 ++++++-- .../dapr/ext/workflow/workflow_context.py | 12 ++++++-- .../test_orchestration_executor.py | 29 +++++++++++++++++++ 4 files changed, 62 insertions(+), 11 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py index d8dc888b..662de955 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py @@ -59,17 +59,23 @@ def is_optional_timer_action(action: pb.WorkflowAction) -> bool: def is_optional_timer_event(event: pb.HistoryEvent) -> bool: - """Returns True if a TimerCreatedEvent is the optional TimerOriginExternalEvent - sentinel timer.""" + """Returns True if a TimerCreatedEvent is the optional sentinel timer. + + For replay compatibility, treat a timerCreated event with the sentinel + fireAt as optional even if the proto3 ``origin`` oneof is unset (e.g. when + reading histories emitted by older sidecars that didn't populate it). When + ``origin`` *is* populated, it must match TimerOriginExternalEvent. + """ if not event.HasField('timerCreated'): return False timer = event.timerCreated - if timer.WhichOneof('origin') != 'externalEvent': + if ( + timer.fireAt.seconds != OPTIONAL_TIMER_FIRE_AT.seconds + or timer.fireAt.nanos != OPTIONAL_TIMER_FIRE_AT.nanos + ): return False - return ( - timer.fireAt.seconds == OPTIONAL_TIMER_FIRE_AT.seconds - and timer.fireAt.nanos == OPTIONAL_TIMER_FIRE_AT.nanos - ) + origin = timer.WhichOneof('origin') + return origin in (None, 'externalEvent') # TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index a2895399..8e4e2a76 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -177,8 +177,16 @@ def wait_for_external_event( name : str The event name of the event that the task is waiting for. timeout : datetime | timedelta | None - Optional deadline or duration after which a ``TimeoutError`` is raised - if the event has not been received. + Controls how long to wait for the event. Three shapes: + + * ``None`` (default) or a *negative* ``timedelta`` — wait indefinitely. + An optional sentinel timer is scheduled internally for runtime + tracking, but ``TimeoutError`` is never raised on its own. + * ``timedelta(0)`` — do not wait at all. The returned task fails + immediately with ``TimeoutError``. + * A future ``datetime`` or a positive ``timedelta`` — wait until + that deadline / for that duration; ``TimeoutError`` is raised if + the event has not been received in time. Returns ------- diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py index 6f5610e2..04b5342d 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py @@ -175,8 +175,16 @@ def wait_for_external_event( name : str The event name of the event that the task is waiting for. timeout : datetime | timedelta | None - Optional deadline or duration after which a ``TimeoutError`` is raised - if the event has not been received. + Controls how long to wait for the event. Three shapes: + + * ``None`` (default) or a *negative* ``timedelta`` — wait indefinitely. + An optional sentinel timer is scheduled internally for runtime + tracking, but ``TimeoutError`` is never raised on its own. + * ``timedelta(0)`` — do not wait at all. The returned task fails + immediately with ``TimeoutError``. + * A future ``datetime`` or a positive ``timedelta`` — wait until + that deadline / for that duration; ``TimeoutError`` is raised if + the event has not been received in time. Returns ------- diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 0b14103e..3e2d3e74 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -1984,6 +1984,35 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert complete_action.result.value == json.dumps('ok') +def test_post_patch_replay_optional_timer_with_unset_origin(): + """An older sidecar may emit TimerCreatedEvent without populating the proto3 + ``origin`` oneof. The sentinel fireAt alone must still classify the event as + optional so the replay matches our pending optional timer cleanly.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event('myEvent') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + # Sentinel fireAt but no origin set. + helpers.new_timer_created_event(1, helpers.OPTIONAL_TIMER_FIRE_AT), + ] + new_events = [helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('ok'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('ok') + + def test_pre_patch_replay_indefinite_wait_then_activity(): """A pre-patch history has the activity scheduled at id=1 (no reserved id for the indefinite wait). The replay must drop the optional timer, shift the From 51222cfce640a76cc36d5371abbbed84122f4f47 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Mon, 13 Apr 2026 17:33:13 +0200 Subject: [PATCH 08/13] Addressed review comments Signed-off-by: Albert Callarisa --- .../dapr/ext/workflow/_durabletask/worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index a4899586..d5ac88b5 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -1442,11 +1442,13 @@ def wait_for_external_event( is_indefinite = timeout is None or ( isinstance(timeout, timedelta) and timeout < timedelta(0) ) - fire_at: Union[datetime, timestamp_pb2.Timestamp] + fire_at: Union[datetime, timedelta, timestamp_pb2.Timestamp] if is_indefinite: fire_at = ph.OPTIONAL_TIMER_FIRE_AT else: - fire_at = timeout # type: ignore[assignment] + # Narrowed by ``is_indefinite``: timeout is a positive timedelta or a datetime here. + assert timeout is not None + fire_at = timeout timer_task = self.create_timer_internal( fire_at, From 461620e3f743622b2b8d514cc23157215437c636 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Mon, 13 Apr 2026 17:44:25 +0200 Subject: [PATCH 09/13] Fix CI Signed-off-by: Albert Callarisa --- .../dapr/ext/workflow/_durabletask/worker.py | 11 ++++ .../test_orchestration_executor.py | 56 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index d5ac88b5..0bef4e53 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -1649,6 +1649,17 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven and not ph.is_optional_timer_event(event) ): ctx._drop_optional_pending_at(timer_id) + pending = ctx._pending_actions.get(timer_id) + # Reverse asymmetric: the incoming TimerCreated is an optional timer + # from an older code version, but the current code has a different + # action at this id (or none at all). This happens when a patch adds + # new actions before the wait_for_external_event that originally + # produced the timer. Silently drop the stale optional event; the + # pending action stays in place to match its own future event. + if ph.is_optional_timer_event(event) and ( + pending is None or not pending.HasField('createTimer') + ): + return action = ctx._pending_actions.pop(timer_id, None) if not action: raise _get_non_determinism_error(timer_id, task.get_name(ctx.create_timer)) diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 3e2d3e74..ee9af508 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -2013,6 +2013,62 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert complete_action.result.value == json.dumps('ok') +def test_patch_adds_activity_before_existing_wait(): + """Reproduces the versioning.py test5/test6 scenario. + + An in-flight orchestration had a wait_for_external_event that emitted an + optional TimerCreated event into history. Later, the code was patched to add + an ``is_patched`` check + activity call *before* the wait. On replay, the + new orchestration code emits a ScheduleTask at the id that the optional + TimerCreated occupies in history. The runtime must silently skip the stale + optional TimerCreated event rather than raising a non-determinism error.""" + + def dummy_activity(ctx, input): + return f'did: {input}' + + def patched_orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.call_activity(dummy_activity, input='start') + # NEW: is_patched branch added before the existing wait. Schedules an + # activity at the id previously occupied by the optional timer. + if ctx.is_patched('patch1'): + yield ctx.call_activity(dummy_activity, input='patch1 is patched') + else: + yield ctx.call_activity(dummy_activity, input='patch1 is not patched') + result = yield ctx.wait_for_external_event('evt') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(patched_orchestrator) + registry.add_activity(dummy_activity) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + # Pre-patch history: start activity at id=1, optional timer at id=2. + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + helpers.new_task_completed_event(1, json.dumps('did: start')), + helpers.new_timer_created_event( + 2, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='evt'), + ), + ] + # Event arrives (triggers replay with the new code path). + new_events = [helpers.new_event_raised_event('evt', encoded_input=json.dumps('ok'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # Replay must NOT fail. It must emit the new patch1-not-patched activity as + # a pending action (id=2 — the slot the stale optional timer is skipped from). + # is_patched returns False during replay because the patch is not recorded in + # the original history's workflowStarted patches. + assert len(actions) == 1 + assert actions[0].HasField('scheduleTask') + assert actions[0].scheduleTask.input.value == json.dumps('patch1 is not patched') + + def test_pre_patch_replay_indefinite_wait_then_activity(): """A pre-patch history has the activity scheduled at id=1 (no reserved id for the indefinite wait). The replay must drop the optional timer, shift the From 5a794042d7b270d4e8b6bd6178ec012b926895d5 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 14 Apr 2026 10:10:44 +0200 Subject: [PATCH 10/13] Remove pending external_event_task when timeout fires Signed-off-by: Albert Callarisa --- .../dapr/ext/workflow/_durabletask/task.py | 9 ++++ .../dapr/ext/workflow/_durabletask/worker.py | 11 +++++ .../test_orchestration_executor.py | 42 +++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index 8e4e2a76..43f6aee6 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -524,6 +524,11 @@ class ExternalEventWithTimeoutTask(CompositeTask[T]): Completes with the event data if the event arrives first, or raises ``TimeoutError`` if the timer fires first. + + When the timer wins, the optional ``on_timeout`` callback is invoked so the + caller can unregister the stale event task from ``_pending_events``, ensuring + that a late ``eventRaised`` for the same name is buffered rather than consumed + by the now-obsolete waiter. """ def __init__( @@ -532,11 +537,13 @@ def __init__( timer_task: TimerTask, event_name: str, timeout: Optional[Union[datetime, timedelta]] = None, + on_timeout: Optional[Callable[[], None]] = None, ): self._event_task = event_task self._timer_task = timer_task self._event_name = event_name self._timeout = timeout + self._on_timeout = on_timeout super().__init__([event_task, timer_task]) def on_child_completed(self, completed_task: Task): @@ -549,6 +556,8 @@ def on_child_completed(self, completed_task: Task): self._result = completed_task.get_result() self._is_complete = True elif completed_task is self._timer_task: + if self._on_timeout is not None: + self._on_timeout() if self._timeout is not None: msg = ( f'Timed out after {self._timeout!r} waiting for ' diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index 0bef4e53..6d74131c 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -1454,11 +1454,22 @@ def wait_for_external_event( fire_at, origin=pb.TimerOriginExternalEvent(name=name), ) + + def _unregister_stale_event_task() -> None: + """Remove the event task from _pending_events so late eventRaised + messages are buffered instead of consumed by this timed-out waiter.""" + pending = self._pending_events.get(event_name) + if pending and external_event_task in pending: + pending.remove(external_event_task) + if not pending: + del self._pending_events[event_name] + return task.ExternalEventWithTimeoutTask( external_event_task, timer_task, event_name=name, timeout=None if is_indefinite else timeout, + on_timeout=_unregister_stale_event_task, ) def continue_as_new(self, new_input, *, save_events: bool = False) -> None: diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index ee9af508..747fa487 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -1893,6 +1893,48 @@ def timeout_orchestrator(ctx: task.OrchestrationContext, _): assert complete_action.result.value == json.dumps('got: hello') +def test_wait_for_external_event_timeout_cleans_up_pending_event(): + """When the timeout timer fires first, the stale event task is unregistered + from _pending_events so that a subsequent wait_for_external_event for the + same name can observe a later event rather than having it consumed by the + timed-out waiter.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + try: + yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=5)) + except TimeoutError: + pass + # Second wait for the same event name — must pick up the late event. + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=60)) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + fire_at = start_time + timedelta(seconds=5) + + # First wait creates timer at id=1, fires. Second wait creates timer at id=2. + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event(1, fire_at), + helpers.new_timer_fired_event(1, fire_at), + helpers.new_timer_created_event(2, start_time + timedelta(seconds=60)), + ] + # The late event arrives during the second wait. + new_events = [ + helpers.new_event_raised_event('myEvent', json.dumps('late hello')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('late hello') + + def test_wait_for_external_event_indefinite_emits_optional_timer(): """WaitForExternalEvent with no timeout (or a negative timeout) emits an optional timer whose fireAt is the exact sentinel 9999-12-31T23:59:59.999999999Z.""" From 7805f97ba68759906876a697adb1ab7b31ca6476 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 14 Apr 2026 11:31:45 +0200 Subject: [PATCH 11/13] Addressed review comments Signed-off-by: Albert Callarisa --- .../dapr/ext/workflow/_durabletask/worker.py | 2 +- .../test_orchestration_executor.py | 49 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index 6d74131c..b79f7eb8 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -1668,7 +1668,7 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven # produced the timer. Silently drop the stale optional event; the # pending action stays in place to match its own future event. if ph.is_optional_timer_event(event) and ( - pending is None or not pending.HasField('createTimer') + pending is None or not ph.is_optional_timer_action(pending) ): return action = ctx._pending_actions.pop(timer_id, None) diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 747fa487..e17fcb04 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -2221,6 +2221,55 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert complete_action.result.value == json.dumps('done') +def test_stale_optional_timer_event_does_not_match_user_timer(): + """A stale optional TimerCreated event in history must not be treated as + confirmation of a non-optional user CreateTimer that now occupies the same id. + + Scenario: older code had an indefinite wait_for_external_event (optional timer + at id=1). A patch replaced the wait with a user CreateTimer at the same id. + The stale optional TimerCreated must be dropped; the user timer must remain + pending and match its own (non-optional) TimerCreated on a future replay.""" + + def patched_orchestrator(ctx: task.OrchestrationContext, _): + # New code: user timer at id=1 (replaces the old indefinite wait). + yield ctx.create_timer(timedelta(seconds=10)) + return 'done' + + registry = worker._Registry() + name = registry.add_orchestrator(patched_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + user_fire_at = start_time + timedelta(seconds=10) + # History from the old code: optional timer at id=1 (stale). + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event( + 1, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='evt'), + ), + ] + # New events: the runtime confirms and fires the real user timer at id=1. + new_events = [ + helpers.new_timer_created_event( + 1, + user_fire_at, + origin=pb.TimerOriginCreateTimer(), + ), + helpers.new_timer_fired_event(1, user_fire_at), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # The stale optional event must have been dropped. The real user timer must + # have been confirmed and fired, completing the orchestration. + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('done') + + def test_pre_patch_replay_two_indefinite_waits(): """Two indefinite waits in sequence. Shifts must compose across multiple optional timers.""" From eae66558c6aacd58acf7ef7256b2e0ea11947e99 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Wed, 15 Apr 2026 15:27:57 +0200 Subject: [PATCH 12/13] Addressed review comments Signed-off-by: Albert Callarisa --- .../workflow/_durabletask/internal/helpers.py | 117 ++------------ .../workflow/_durabletask/internal/timer.py | 145 ++++++++++++++++++ .../dapr/ext/workflow/_durabletask/task.py | 22 +-- .../dapr/ext/workflow/workflow_context.py | 4 +- 4 files changed, 170 insertions(+), 118 deletions(-) create mode 100644 ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py index 662de955..70c41c60 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py @@ -11,72 +11,19 @@ import traceback from datetime import datetime -from typing import Optional, Union +from typing import Optional import dapr.ext.workflow._durabletask.internal.protos as pb -from google.protobuf import timestamp_pb2, wrappers_pb2 - -TimerOrigin = Union[ - pb.TimerOriginCreateTimer, - pb.TimerOriginExternalEvent, - pb.TimerOriginActivityRetry, - pb.TimerOriginChildWorkflowRetry, -] - -_ORIGIN_FIELD: dict[type, str] = { - pb.TimerOriginCreateTimer: 'createTimer', - pb.TimerOriginExternalEvent: 'externalEvent', - pb.TimerOriginActivityRetry: 'activityRetry', - pb.TimerOriginChildWorkflowRetry: 'childWorkflowRetry', -} - -# Sentinel fireAt used for "optional" TimerOriginExternalEvent timers that back an -# indefinite wait_for_external_event. The sentinel is 9999-12-31T23:59:59.999999999Z -# (nanosecond precision — cannot be represented with Python's datetime, which only -# supports microseconds, so we build the Timestamp directly). -OPTIONAL_TIMER_FIRE_AT: timestamp_pb2.Timestamp = timestamp_pb2.Timestamp( - seconds=253402300799, nanos=999999999 +from dapr.ext.workflow._durabletask.internal.timer import ( # noqa: F401 + OPTIONAL_TIMER_FIRE_AT, + TimerOrigin, + is_optional_timer_action, + is_optional_timer_event, + new_create_timer_action, + new_timer_created_event, + new_timer_fired_event, ) - - -def is_optional_timer_action(action: pb.WorkflowAction) -> bool: - """Returns True if the action is an optional TimerOriginExternalEvent timer - with the sentinel fireAt — i.e. created by an indefinite wait_for_external_event. - - Pre-patch histories (from prior SDK versions that didn't schedule a timer for - indefinite waits) won't carry a matching TimerCreatedEvent; the replay logic - uses this check to drop the optional action and shift sequence ids. - """ - if not action.HasField('createTimer'): - return False - timer = action.createTimer - if timer.WhichOneof('origin') != 'externalEvent': - return False - return ( - timer.fireAt.seconds == OPTIONAL_TIMER_FIRE_AT.seconds - and timer.fireAt.nanos == OPTIONAL_TIMER_FIRE_AT.nanos - ) - - -def is_optional_timer_event(event: pb.HistoryEvent) -> bool: - """Returns True if a TimerCreatedEvent is the optional sentinel timer. - - For replay compatibility, treat a timerCreated event with the sentinel - fireAt as optional even if the proto3 ``origin`` oneof is unset (e.g. when - reading histories emitted by older sidecars that didn't populate it). When - ``origin`` *is* populated, it must match TimerOriginExternalEvent. - """ - if not event.HasField('timerCreated'): - return False - timer = event.timerCreated - if ( - timer.fireAt.seconds != OPTIONAL_TIMER_FIRE_AT.seconds - or timer.fireAt.nanos != OPTIONAL_TIMER_FIRE_AT.nanos - ): - return False - origin = timer.WhichOneof('origin') - return origin in (None, 'externalEvent') - +from google.protobuf import timestamp_pb2, wrappers_pb2 # TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere @@ -102,34 +49,6 @@ def new_execution_started_event( ) -def new_timer_created_event( - timer_id: int, - fire_at: Union[datetime, timestamp_pb2.Timestamp], - origin: Optional[TimerOrigin] = None, -) -> pb.HistoryEvent: - if isinstance(fire_at, timestamp_pb2.Timestamp): - ts = fire_at - else: - ts = timestamp_pb2.Timestamp() - ts.FromDatetime(fire_at) - origin_kwargs = {_ORIGIN_FIELD[type(origin)]: origin} if origin is not None else {} - return pb.HistoryEvent( - eventId=timer_id, - timestamp=timestamp_pb2.Timestamp(), - timerCreated=pb.TimerCreatedEvent(fireAt=ts, **origin_kwargs), - ) - - -def new_timer_fired_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent: - ts = timestamp_pb2.Timestamp() - ts.FromDatetime(fire_at) - return pb.HistoryEvent( - eventId=-1, - timestamp=timestamp_pb2.Timestamp(), - timerFired=pb.TimerFiredEvent(fireAt=ts, timerId=timer_id), - ) - - def new_task_scheduled_event( event_id: int, name: str, encoded_input: Optional[str] = None ) -> pb.HistoryEvent: @@ -272,22 +191,6 @@ def new_workflow_version_not_available_action( ) -def new_create_timer_action( - id: int, - fire_at: Union[datetime, timestamp_pb2.Timestamp], - origin: Optional[TimerOrigin] = None, -) -> pb.WorkflowAction: - if isinstance(fire_at, timestamp_pb2.Timestamp): - timestamp = fire_at - else: - timestamp = timestamp_pb2.Timestamp() - timestamp.FromDatetime(fire_at) - origin_kwargs = {_ORIGIN_FIELD[type(origin)]: origin} if origin is not None else {} - return pb.WorkflowAction( - id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp, **origin_kwargs) - ) - - def new_schedule_task_action( id: int, name: str, diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py new file mode 100644 index 00000000..b78d03e3 --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py @@ -0,0 +1,145 @@ +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Timer-related helpers for the durabletask workflow SDK. + +This module keeps all timer / TimerOrigin concerns in one place, separate from +the generic action/event helpers in ``helpers.py``. +""" + +from datetime import datetime +from typing import Optional, Union + +import dapr.ext.workflow._durabletask.internal.protos as pb +from google.protobuf import timestamp_pb2 + +TimerOrigin = Union[ + pb.TimerOriginCreateTimer, + pb.TimerOriginExternalEvent, + pb.TimerOriginActivityRetry, + pb.TimerOriginChildWorkflowRetry, +] + +_ORIGIN_FIELD: dict[type, str] = { + pb.TimerOriginCreateTimer: 'createTimer', + pb.TimerOriginExternalEvent: 'externalEvent', + pb.TimerOriginActivityRetry: 'activityRetry', + pb.TimerOriginChildWorkflowRetry: 'childWorkflowRetry', +} + +# Sentinel fireAt used for "optional" TimerOriginExternalEvent timers that back +# an indefinite wait_for_external_event. The sentinel is +# 9999-12-31T23:59:59.999999999Z (nanosecond precision — Python's ``datetime`` +# is only microsecond-precision, so we build the Timestamp directly). +OPTIONAL_TIMER_FIRE_AT: timestamp_pb2.Timestamp = timestamp_pb2.Timestamp( + seconds=253402300799, nanos=999999999 +) + + +def _origin_kwargs(origin: Optional[TimerOrigin]) -> dict: + """Build keyword args that set the correct origin oneof field on a + ``CreateTimerAction`` / ``TimerCreatedEvent`` proto.""" + if origin is None: + return {} + return {_ORIGIN_FIELD[type(origin)]: origin} + + +def _to_timestamp( + fire_at: Union[datetime, timestamp_pb2.Timestamp], +) -> timestamp_pb2.Timestamp: + """Normalize a ``fire_at`` argument to a protobuf ``Timestamp``.""" + if isinstance(fire_at, timestamp_pb2.Timestamp): + return fire_at + ts = timestamp_pb2.Timestamp() + ts.FromDatetime(fire_at) + return ts + + +def _fire_at_matches_sentinel(ts: timestamp_pb2.Timestamp) -> bool: + """Return True iff ``ts`` exactly matches :data:`OPTIONAL_TIMER_FIRE_AT`. + + Both ``seconds`` and ``nanos`` are proto3 scalars that are always present + (default 0 when unset), so unconditional field access is safe. + """ + return ( + ts.seconds == OPTIONAL_TIMER_FIRE_AT.seconds + and ts.nanos == OPTIONAL_TIMER_FIRE_AT.nanos + ) + + +def is_optional_timer_action(action: pb.WorkflowAction) -> bool: + """Returns True if the action is an optional TimerOriginExternalEvent timer + with the sentinel fireAt — i.e. created by an indefinite wait_for_external_event. + + Pre-patch histories (from prior SDK versions that didn't schedule a timer + for indefinite waits) won't carry a matching TimerCreatedEvent; the replay + logic uses this check to drop the optional action and shift sequence ids. + """ + if not action.HasField('createTimer'): + return False + timer = action.createTimer + if timer.WhichOneof('origin') != 'externalEvent': + return False + return _fire_at_matches_sentinel(timer.fireAt) + + +def is_optional_timer_event(event: pb.HistoryEvent) -> bool: + """Returns True if a TimerCreatedEvent is the optional sentinel timer. + + For replay compatibility, treat a timerCreated event with the sentinel + fireAt as optional even if the proto3 ``origin`` oneof is unset (e.g. when + reading histories emitted by older sidecars that didn't populate it). When + ``origin`` *is* populated, it must match TimerOriginExternalEvent. + """ + if not event.HasField('timerCreated'): + return False + timer = event.timerCreated + if not _fire_at_matches_sentinel(timer.fireAt): + return False + origin = timer.WhichOneof('origin') + return origin in (None, 'externalEvent') + + +def new_create_timer_action( + timer_id: int, + fire_at: Union[datetime, timestamp_pb2.Timestamp], + origin: Optional[TimerOrigin] = None, +) -> pb.WorkflowAction: + ts = _to_timestamp(fire_at) + return pb.WorkflowAction( + id=timer_id, + createTimer=pb.CreateTimerAction(fireAt=ts, **_origin_kwargs(origin)), + ) + + +def new_timer_created_event( + timer_id: int, + fire_at: Union[datetime, timestamp_pb2.Timestamp], + origin: Optional[TimerOrigin] = None, +) -> pb.HistoryEvent: + ts = _to_timestamp(fire_at) + return pb.HistoryEvent( + eventId=timer_id, + timestamp=timestamp_pb2.Timestamp(), + timerCreated=pb.TimerCreatedEvent(fireAt=ts, **_origin_kwargs(origin)), + ) + + +def new_timer_fired_event( + timer_id: int, + fire_at: Union[datetime, timestamp_pb2.Timestamp], +) -> pb.HistoryEvent: + ts = _to_timestamp(fire_at) + return pb.HistoryEvent( + eventId=-1, + timestamp=timestamp_pb2.Timestamp(), + timerFired=pb.TimerFiredEvent(fireAt=ts, timerId=timer_id), + ) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index 43f6aee6..16ba7946 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -177,7 +177,9 @@ def wait_for_external_event( name : str The event name of the event that the task is waiting for. timeout : datetime | timedelta | None - Controls how long to wait for the event. Three shapes: + Controls how long to wait for the event. Accepts only ``datetime`` + or ``timedelta`` — a plain numeric ``0`` is **not** a valid value + (use ``timedelta(0)`` explicitly). Three shapes: * ``None`` (default) or a *negative* ``timedelta`` — wait indefinitely. An optional sentinel timer is scheduled internally for runtime @@ -414,9 +416,13 @@ def cancel(self, exc: Exception) -> None: Used for non-runtime failures such as a zero-timeout wait_for_external_event being canceled before any history event arrives. + + Idempotent: a noop if the task has already completed (success or + failure), which matches the common semantics of ``cancel()`` in + asyncio / threading APIs. """ if self._is_complete: - raise ValueError('The task has already completed.') + return self._exception = exc self._is_complete = True if self._parent is not None: @@ -558,14 +564,10 @@ def on_child_completed(self, completed_task: Task): elif completed_task is self._timer_task: if self._on_timeout is not None: self._on_timeout() - if self._timeout is not None: - msg = ( - f'Timed out after {self._timeout!r} waiting for ' - f'external event {self._event_name!r}' - ) - else: - msg = f'Timed out waiting for external event {self._event_name!r}' - self._exception = TimeoutError(msg) + after = f' after {self._timeout!r}' if self._timeout is not None else '' + self._exception = TimeoutError( + f'Timed out{after} waiting for external event {self._event_name!r}' + ) self._is_complete = True if self._is_complete and self._parent is not None: self._parent.on_child_completed(self) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py index 04b5342d..af31e84d 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py @@ -175,7 +175,9 @@ def wait_for_external_event( name : str The event name of the event that the task is waiting for. timeout : datetime | timedelta | None - Controls how long to wait for the event. Three shapes: + Controls how long to wait for the event. Accepts only ``datetime`` + or ``timedelta`` — a plain numeric ``0`` is **not** a valid value + (use ``timedelta(0)`` explicitly). Three shapes: * ``None`` (default) or a *negative* ``timedelta`` — wait indefinitely. An optional sentinel timer is scheduled internally for runtime From bd9297a8687b2dbbaac5abafec1dec4fdeb8a11e Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Wed, 15 Apr 2026 15:39:28 +0200 Subject: [PATCH 13/13] Lint Signed-off-by: Albert Callarisa --- .../dapr/ext/workflow/_durabletask/internal/timer.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py index b78d03e3..e04c4da0 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py @@ -69,10 +69,7 @@ def _fire_at_matches_sentinel(ts: timestamp_pb2.Timestamp) -> bool: Both ``seconds`` and ``nanos`` are proto3 scalars that are always present (default 0 when unset), so unconditional field access is safe. """ - return ( - ts.seconds == OPTIONAL_TIMER_FIRE_AT.seconds - and ts.nanos == OPTIONAL_TIMER_FIRE_AT.nanos - ) + return ts.seconds == OPTIONAL_TIMER_FIRE_AT.seconds and ts.nanos == OPTIONAL_TIMER_FIRE_AT.nanos def is_optional_timer_action(action: pb.WorkflowAction) -> bool: