diff --git a/dapr/clients/grpc/_helpers.py b/dapr/clients/grpc/_helpers.py index df71083d..1ab1f9e6 100644 --- a/dapr/clients/grpc/_helpers.py +++ b/dapr/clients/grpc/_helpers.py @@ -64,6 +64,9 @@ def unpack(data: GrpcAny, message: GrpcMessage) -> None: """ if not isinstance(message, GrpcMessage): raise ValueError('output message is not protocol buffer message object') + # cast: newer types-grpcio stubs declare ``message.DESCRIPTOR`` as a union of + # ``Descriptor`` and the C-extension ``_upb._message.Descriptor``. Both are + # accepted at runtime by ``Any.Is``; the cast narrows the type for mypy. if not data.Is(cast(Descriptor, message.DESCRIPTOR)): raise ValueError(f'invalid type. serialized message type: {data.type_url}') data.Unpack(message) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py index ebfb3402..70c41c60 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py @@ -14,6 +14,15 @@ from typing import Optional import dapr.ext.workflow._durabletask.internal.protos as pb +from dapr.ext.workflow._durabletask.internal.timer import ( # noqa: F401 + OPTIONAL_TIMER_FIRE_AT, + TimerOrigin, + is_optional_timer_action, + is_optional_timer_event, + new_create_timer_action, + new_timer_created_event, + new_timer_fired_event, +) from google.protobuf import timestamp_pb2, wrappers_pb2 # TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere @@ -40,26 +49,6 @@ def new_execution_started_event( ) -def new_timer_created_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent: - ts = timestamp_pb2.Timestamp() - ts.FromDatetime(fire_at) - return pb.HistoryEvent( - eventId=timer_id, - timestamp=timestamp_pb2.Timestamp(), - timerCreated=pb.TimerCreatedEvent(fireAt=ts), - ) - - -def new_timer_fired_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent: - ts = timestamp_pb2.Timestamp() - ts.FromDatetime(fire_at) - return pb.HistoryEvent( - eventId=-1, - timestamp=timestamp_pb2.Timestamp(), - timerFired=pb.TimerFiredEvent(fireAt=ts, timerId=timer_id), - ) - - def new_task_scheduled_event( event_id: int, name: str, encoded_input: Optional[str] = None ) -> pb.HistoryEvent: @@ -202,12 +191,6 @@ def new_workflow_version_not_available_action( ) -def new_create_timer_action(id: int, fire_at: datetime) -> pb.WorkflowAction: - timestamp = timestamp_pb2.Timestamp() - timestamp.FromDatetime(fire_at) - return pb.WorkflowAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp)) - - def new_schedule_task_action( id: int, name: str, diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py index 7a5dea47..feb7e313 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py @@ -27,7 +27,7 @@ from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14history_events.proto\x1a\x13orchestration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\xd6\x03\n\x15\x45xecutionStartedEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x10workflowInstance\x18\x04 \x01(\x0b\x32\x11.WorkflowInstance\x12+\n\x0eparentInstance\x18\x05 \x01(\x0b\x32\x13.ParentInstanceInfo\x12;\n\x17scheduledStartTimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12)\n\x12parentTraceContext\x18\x07 \x01(\x0b\x32\r.TraceContext\x12\x34\n\x0eworkflowSpanID\x18\x08 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12.\n\x04tags\x18\t \x03(\x0b\x32 .ExecutionStartedEvent.TagsEntry\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xa2\x01\n\x17\x45xecutionCompletedEvent\x12,\n\x0eworkflowStatus\x18\x01 \x01(\x0e\x32\x14.OrchestrationStatus\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x0e\x66\x61ilureDetails\x18\x03 \x01(\x0b\x32\x13.TaskFailureDetails\"X\n\x18\x45xecutionTerminatedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x0f\n\x07recurse\x18\x02 \x01(\x08\"\x9e\x02\n\x12TaskScheduledEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12)\n\x12parentTraceContext\x18\x04 \x01(\x0b\x32\r.TraceContext\x12\x17\n\x0ftaskExecutionId\x18\x05 \x01(\t\x12>\n\x17rerunParentInstanceInfo\x18\x06 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x00\x88\x01\x01\x42\x1a\n\x18_rerunParentInstanceInfo\"t\n\x12TaskCompletedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x17\n\x0ftaskExecutionId\x18\x03 \x01(\t\"p\n\x0fTaskFailedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12+\n\x0e\x66\x61ilureDetails\x18\x02 \x01(\x0b\x32\x13.TaskFailureDetails\x12\x17\n\x0ftaskExecutionId\x18\x03 \x01(\t\"\xa8\x02\n!ChildWorkflowInstanceCreatedEvent\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12)\n\x12parentTraceContext\x18\x05 \x01(\x0b\x32\r.TraceContext\x12>\n\x17rerunParentInstanceInfo\x18\x06 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x00\x88\x01\x01\x42\x1a\n\x18_rerunParentInstanceInfo\"l\n#ChildWorkflowInstanceCompletedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"h\n ChildWorkflowInstanceFailedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12+\n\x0e\x66\x61ilureDetails\x18\x02 \x01(\x0b\x32\x13.TaskFailureDetails\"\x18\n\x16TimerOriginCreateTimer\"(\n\x18TimerOriginExternalEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\"\xa5\x02\n\x11TimerCreatedEvent\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12>\n\x17rerunParentInstanceInfo\x18\x03 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x02\x88\x01\x01\x12.\n\x0b\x63reateTimer\x18\x04 \x01(\x0b\x32\x17.TimerOriginCreateTimerH\x00\x12\x32\n\rexternalEvent\x18\x05 \x01(\x0b\x32\x19.TimerOriginExternalEventH\x00\x42\x08\n\x06originB\x07\n\x05_nameB\x1a\n\x18_rerunParentInstanceInfo\"N\n\x0fTimerFiredEvent\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0f\n\x07timerId\x18\x02 \x01(\x05\"J\n\x14WorkflowStartedEvent\x12&\n\x07version\x18\x01 \x01(\x0b\x32\x10.WorkflowVersionH\x00\x88\x01\x01\x42\n\n\x08_version\"\x18\n\x16WorkflowCompletedEvent\"_\n\x0e\x45ventSentEvent\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"M\n\x10\x45ventRaisedEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x05input\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"A\n\x12\x43ontinueAsNewEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"F\n\x17\x45xecutionSuspendedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"D\n\x15\x45xecutionResumedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"a\n\x15\x45xecutionStalledEvent\x12\x1e\n\x06reason\x18\x01 \x01(\x0e\x32\x0e.StalledReason\x12\x18\n\x0b\x64\x65scription\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x0e\n\x0c_description\"\xa8\t\n\x0cHistoryEvent\x12\x0f\n\x07\x65ventId\x18\x01 \x01(\x05\x12-\n\ttimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x32\n\x10\x65xecutionStarted\x18\x03 \x01(\x0b\x32\x16.ExecutionStartedEventH\x00\x12\x36\n\x12\x65xecutionCompleted\x18\x04 \x01(\x0b\x32\x18.ExecutionCompletedEventH\x00\x12\x38\n\x13\x65xecutionTerminated\x18\x05 \x01(\x0b\x32\x19.ExecutionTerminatedEventH\x00\x12,\n\rtaskScheduled\x18\x06 \x01(\x0b\x32\x13.TaskScheduledEventH\x00\x12,\n\rtaskCompleted\x18\x07 \x01(\x0b\x32\x13.TaskCompletedEventH\x00\x12&\n\ntaskFailed\x18\x08 \x01(\x0b\x32\x10.TaskFailedEventH\x00\x12J\n\x1c\x63hildWorkflowInstanceCreated\x18\t \x01(\x0b\x32\".ChildWorkflowInstanceCreatedEventH\x00\x12N\n\x1e\x63hildWorkflowInstanceCompleted\x18\n \x01(\x0b\x32$.ChildWorkflowInstanceCompletedEventH\x00\x12H\n\x1b\x63hildWorkflowInstanceFailed\x18\x0b \x01(\x0b\x32!.ChildWorkflowInstanceFailedEventH\x00\x12*\n\x0ctimerCreated\x18\x0c \x01(\x0b\x32\x12.TimerCreatedEventH\x00\x12&\n\ntimerFired\x18\r \x01(\x0b\x32\x10.TimerFiredEventH\x00\x12\x30\n\x0fworkflowStarted\x18\x0e \x01(\x0b\x32\x15.WorkflowStartedEventH\x00\x12\x34\n\x11workflowCompleted\x18\x0f \x01(\x0b\x32\x17.WorkflowCompletedEventH\x00\x12$\n\teventSent\x18\x10 \x01(\x0b\x32\x0f.EventSentEventH\x00\x12(\n\x0b\x65ventRaised\x18\x11 \x01(\x0b\x32\x11.EventRaisedEventH\x00\x12,\n\rcontinueAsNew\x18\x14 \x01(\x0b\x32\x13.ContinueAsNewEventH\x00\x12\x36\n\x12\x65xecutionSuspended\x18\x15 \x01(\x0b\x32\x18.ExecutionSuspendedEventH\x00\x12\x32\n\x10\x65xecutionResumed\x18\x16 \x01(\x0b\x32\x16.ExecutionResumedEventH\x00\x12\x32\n\x10\x65xecutionStalled\x18\x1f \x01(\x0b\x32\x16.ExecutionStalledEventH\x00\x12 \n\x06router\x18\x1e \x01(\x0b\x32\x0b.TaskRouterH\x01\x88\x01\x01\x42\x0b\n\teventTypeB\t\n\x07_routerJ\x04\x08\x12\x10\x13J\x04\x08\x13\x10\x14J\x04\x08\x17\x10\x18J\x04\x08\x18\x10\x19J\x04\x08\x19\x10\x1aJ\x04\x08\x1a\x10\x1bJ\x04\x08\x1b\x10\x1cJ\x04\x08\x1c\x10\x1dJ\x04\x08\x1d\x10\x1e\x42V\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14history_events.proto\x1a\x13orchestration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\xd6\x03\n\x15\x45xecutionStartedEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x10workflowInstance\x18\x04 \x01(\x0b\x32\x11.WorkflowInstance\x12+\n\x0eparentInstance\x18\x05 \x01(\x0b\x32\x13.ParentInstanceInfo\x12;\n\x17scheduledStartTimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12)\n\x12parentTraceContext\x18\x07 \x01(\x0b\x32\r.TraceContext\x12\x34\n\x0eworkflowSpanID\x18\x08 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12.\n\x04tags\x18\t \x03(\x0b\x32 .ExecutionStartedEvent.TagsEntry\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xa2\x01\n\x17\x45xecutionCompletedEvent\x12,\n\x0eworkflowStatus\x18\x01 \x01(\x0e\x32\x14.OrchestrationStatus\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x0e\x66\x61ilureDetails\x18\x03 \x01(\x0b\x32\x13.TaskFailureDetails\"X\n\x18\x45xecutionTerminatedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x0f\n\x07recurse\x18\x02 \x01(\x08\"\x9e\x02\n\x12TaskScheduledEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12)\n\x12parentTraceContext\x18\x04 \x01(\x0b\x32\r.TraceContext\x12\x17\n\x0ftaskExecutionId\x18\x05 \x01(\t\x12>\n\x17rerunParentInstanceInfo\x18\x06 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x00\x88\x01\x01\x42\x1a\n\x18_rerunParentInstanceInfo\"t\n\x12TaskCompletedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x17\n\x0ftaskExecutionId\x18\x03 \x01(\t\"p\n\x0fTaskFailedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12+\n\x0e\x66\x61ilureDetails\x18\x02 \x01(\x0b\x32\x13.TaskFailureDetails\x12\x17\n\x0ftaskExecutionId\x18\x03 \x01(\t\"\xa8\x02\n!ChildWorkflowInstanceCreatedEvent\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12)\n\x12parentTraceContext\x18\x05 \x01(\x0b\x32\r.TraceContext\x12>\n\x17rerunParentInstanceInfo\x18\x06 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x00\x88\x01\x01\x42\x1a\n\x18_rerunParentInstanceInfo\"l\n#ChildWorkflowInstanceCompletedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"h\n ChildWorkflowInstanceFailedEvent\x12\x17\n\x0ftaskScheduledId\x18\x01 \x01(\x05\x12+\n\x0e\x66\x61ilureDetails\x18\x02 \x01(\x0b\x32\x13.TaskFailureDetails\"\x18\n\x16TimerOriginCreateTimer\"(\n\x18TimerOriginExternalEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\"3\n\x18TimerOriginActivityRetry\x12\x17\n\x0ftaskExecutionId\x18\x01 \x01(\t\"3\n\x1dTimerOriginChildWorkflowRetry\x12\x12\n\ninstanceId\x18\x01 \x01(\t\"\x97\x03\n\x11TimerCreatedEvent\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12>\n\x17rerunParentInstanceInfo\x18\x03 \x01(\x0b\x32\x18.RerunParentInstanceInfoH\x02\x88\x01\x01\x12.\n\x0b\x63reateTimer\x18\x04 \x01(\x0b\x32\x17.TimerOriginCreateTimerH\x00\x12\x32\n\rexternalEvent\x18\x05 \x01(\x0b\x32\x19.TimerOriginExternalEventH\x00\x12\x32\n\ractivityRetry\x18\x06 \x01(\x0b\x32\x19.TimerOriginActivityRetryH\x00\x12<\n\x12\x63hildWorkflowRetry\x18\x07 \x01(\x0b\x32\x1e.TimerOriginChildWorkflowRetryH\x00\x42\x08\n\x06originB\x07\n\x05_nameB\x1a\n\x18_rerunParentInstanceInfo\"N\n\x0fTimerFiredEvent\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0f\n\x07timerId\x18\x02 \x01(\x05\"J\n\x14WorkflowStartedEvent\x12&\n\x07version\x18\x01 \x01(\x0b\x32\x10.WorkflowVersionH\x00\x88\x01\x01\x42\n\n\x08_version\"\x18\n\x16WorkflowCompletedEvent\"_\n\x0e\x45ventSentEvent\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"M\n\x10\x45ventRaisedEvent\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x05input\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"A\n\x12\x43ontinueAsNewEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"F\n\x17\x45xecutionSuspendedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"D\n\x15\x45xecutionResumedEvent\x12+\n\x05input\x18\x01 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"a\n\x15\x45xecutionStalledEvent\x12\x1e\n\x06reason\x18\x01 \x01(\x0e\x32\x0e.StalledReason\x12\x18\n\x0b\x64\x65scription\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x0e\n\x0c_description\"\xa8\t\n\x0cHistoryEvent\x12\x0f\n\x07\x65ventId\x18\x01 \x01(\x05\x12-\n\ttimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x32\n\x10\x65xecutionStarted\x18\x03 \x01(\x0b\x32\x16.ExecutionStartedEventH\x00\x12\x36\n\x12\x65xecutionCompleted\x18\x04 \x01(\x0b\x32\x18.ExecutionCompletedEventH\x00\x12\x38\n\x13\x65xecutionTerminated\x18\x05 \x01(\x0b\x32\x19.ExecutionTerminatedEventH\x00\x12,\n\rtaskScheduled\x18\x06 \x01(\x0b\x32\x13.TaskScheduledEventH\x00\x12,\n\rtaskCompleted\x18\x07 \x01(\x0b\x32\x13.TaskCompletedEventH\x00\x12&\n\ntaskFailed\x18\x08 \x01(\x0b\x32\x10.TaskFailedEventH\x00\x12J\n\x1c\x63hildWorkflowInstanceCreated\x18\t \x01(\x0b\x32\".ChildWorkflowInstanceCreatedEventH\x00\x12N\n\x1e\x63hildWorkflowInstanceCompleted\x18\n \x01(\x0b\x32$.ChildWorkflowInstanceCompletedEventH\x00\x12H\n\x1b\x63hildWorkflowInstanceFailed\x18\x0b \x01(\x0b\x32!.ChildWorkflowInstanceFailedEventH\x00\x12*\n\x0ctimerCreated\x18\x0c \x01(\x0b\x32\x12.TimerCreatedEventH\x00\x12&\n\ntimerFired\x18\r \x01(\x0b\x32\x10.TimerFiredEventH\x00\x12\x30\n\x0fworkflowStarted\x18\x0e \x01(\x0b\x32\x15.WorkflowStartedEventH\x00\x12\x34\n\x11workflowCompleted\x18\x0f \x01(\x0b\x32\x17.WorkflowCompletedEventH\x00\x12$\n\teventSent\x18\x10 \x01(\x0b\x32\x0f.EventSentEventH\x00\x12(\n\x0b\x65ventRaised\x18\x11 \x01(\x0b\x32\x11.EventRaisedEventH\x00\x12,\n\rcontinueAsNew\x18\x14 \x01(\x0b\x32\x13.ContinueAsNewEventH\x00\x12\x36\n\x12\x65xecutionSuspended\x18\x15 \x01(\x0b\x32\x18.ExecutionSuspendedEventH\x00\x12\x32\n\x10\x65xecutionResumed\x18\x16 \x01(\x0b\x32\x16.ExecutionResumedEventH\x00\x12\x32\n\x10\x65xecutionStalled\x18\x1f \x01(\x0b\x32\x16.ExecutionStalledEventH\x00\x12 \n\x06router\x18\x1e \x01(\x0b\x32\x0b.TaskRouterH\x01\x88\x01\x01\x42\x0b\n\teventTypeB\t\n\x07_routerJ\x04\x08\x12\x10\x13J\x04\x08\x13\x10\x14J\x04\x08\x17\x10\x18J\x04\x08\x18\x10\x19J\x04\x08\x19\x10\x1aJ\x04\x08\x1a\x10\x1bJ\x04\x08\x1b\x10\x1cJ\x04\x08\x1c\x10\x1dJ\x04\x08\x1d\x10\x1e\x42V\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -61,26 +61,30 @@ _globals['_TIMERORIGINCREATETIMER']._serialized_end=1898 _globals['_TIMERORIGINEXTERNALEVENT']._serialized_start=1900 _globals['_TIMERORIGINEXTERNALEVENT']._serialized_end=1940 - _globals['_TIMERCREATEDEVENT']._serialized_start=1943 - _globals['_TIMERCREATEDEVENT']._serialized_end=2236 - _globals['_TIMERFIREDEVENT']._serialized_start=2238 - _globals['_TIMERFIREDEVENT']._serialized_end=2316 - _globals['_WORKFLOWSTARTEDEVENT']._serialized_start=2318 - _globals['_WORKFLOWSTARTEDEVENT']._serialized_end=2392 - _globals['_WORKFLOWCOMPLETEDEVENT']._serialized_start=2394 - _globals['_WORKFLOWCOMPLETEDEVENT']._serialized_end=2418 - _globals['_EVENTSENTEVENT']._serialized_start=2420 - _globals['_EVENTSENTEVENT']._serialized_end=2515 - _globals['_EVENTRAISEDEVENT']._serialized_start=2517 - _globals['_EVENTRAISEDEVENT']._serialized_end=2594 - _globals['_CONTINUEASNEWEVENT']._serialized_start=2596 - _globals['_CONTINUEASNEWEVENT']._serialized_end=2661 - _globals['_EXECUTIONSUSPENDEDEVENT']._serialized_start=2663 - _globals['_EXECUTIONSUSPENDEDEVENT']._serialized_end=2733 - _globals['_EXECUTIONRESUMEDEVENT']._serialized_start=2735 - _globals['_EXECUTIONRESUMEDEVENT']._serialized_end=2803 - _globals['_EXECUTIONSTALLEDEVENT']._serialized_start=2805 - _globals['_EXECUTIONSTALLEDEVENT']._serialized_end=2902 - _globals['_HISTORYEVENT']._serialized_start=2905 - _globals['_HISTORYEVENT']._serialized_end=4097 + _globals['_TIMERORIGINACTIVITYRETRY']._serialized_start=1942 + _globals['_TIMERORIGINACTIVITYRETRY']._serialized_end=1993 + _globals['_TIMERORIGINCHILDWORKFLOWRETRY']._serialized_start=1995 + _globals['_TIMERORIGINCHILDWORKFLOWRETRY']._serialized_end=2046 + _globals['_TIMERCREATEDEVENT']._serialized_start=2049 + _globals['_TIMERCREATEDEVENT']._serialized_end=2456 + _globals['_TIMERFIREDEVENT']._serialized_start=2458 + _globals['_TIMERFIREDEVENT']._serialized_end=2536 + _globals['_WORKFLOWSTARTEDEVENT']._serialized_start=2538 + _globals['_WORKFLOWSTARTEDEVENT']._serialized_end=2612 + _globals['_WORKFLOWCOMPLETEDEVENT']._serialized_start=2614 + _globals['_WORKFLOWCOMPLETEDEVENT']._serialized_end=2638 + _globals['_EVENTSENTEVENT']._serialized_start=2640 + _globals['_EVENTSENTEVENT']._serialized_end=2735 + _globals['_EVENTRAISEDEVENT']._serialized_start=2737 + _globals['_EVENTRAISEDEVENT']._serialized_end=2814 + _globals['_CONTINUEASNEWEVENT']._serialized_start=2816 + _globals['_CONTINUEASNEWEVENT']._serialized_end=2881 + _globals['_EXECUTIONSUSPENDEDEVENT']._serialized_start=2883 + _globals['_EXECUTIONSUSPENDEDEVENT']._serialized_end=2953 + _globals['_EXECUTIONRESUMEDEVENT']._serialized_start=2955 + _globals['_EXECUTIONRESUMEDEVENT']._serialized_end=3023 + _globals['_EXECUTIONSTALLEDEVENT']._serialized_start=3025 + _globals['_EXECUTIONSTALLEDEVENT']._serialized_end=3122 + _globals['_HISTORYEVENT']._serialized_start=3125 + _globals['_HISTORYEVENT']._serialized_end=4317 # @@protoc_insertion_point(module_scope) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi index 26ff5cfd..66ba6ba6 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi @@ -351,6 +351,44 @@ class TimerOriginExternalEvent(_message.Message): Global___TimerOriginExternalEvent: _TypeAlias = TimerOriginExternalEvent # noqa: Y015 +@_typing.final +class TimerOriginActivityRetry(_message.Message): + """Indicates the timer was created as a retry delay for an activity execution.""" + + DESCRIPTOR: _descriptor.Descriptor + + TASKEXECUTIONID_FIELD_NUMBER: _builtins.int + taskExecutionId: _builtins.str + """The task execution ID of the activity being retried.""" + def __init__( + self, + *, + taskExecutionId: _builtins.str = ..., + ) -> None: ... + _ClearFieldArgType: _TypeAlias = _typing.Literal["taskExecutionId", b"taskExecutionId"] # noqa: Y015 + def ClearField(self, field_name: _ClearFieldArgType) -> None: ... + +Global___TimerOriginActivityRetry: _TypeAlias = TimerOriginActivityRetry # noqa: Y015 + +@_typing.final +class TimerOriginChildWorkflowRetry(_message.Message): + """Indicates the timer was created as a retry delay for a child workflow execution.""" + + DESCRIPTOR: _descriptor.Descriptor + + INSTANCEID_FIELD_NUMBER: _builtins.int + instanceId: _builtins.str + """The instance ID of the workflow being retried.""" + def __init__( + self, + *, + instanceId: _builtins.str = ..., + ) -> None: ... + _ClearFieldArgType: _TypeAlias = _typing.Literal["instanceId", b"instanceId"] # noqa: Y015 + def ClearField(self, field_name: _ClearFieldArgType) -> None: ... + +Global___TimerOriginChildWorkflowRetry: _TypeAlias = TimerOriginChildWorkflowRetry # noqa: Y015 + @_typing.final class TimerCreatedEvent(_message.Message): DESCRIPTOR: _descriptor.Descriptor @@ -360,6 +398,8 @@ class TimerCreatedEvent(_message.Message): RERUNPARENTINSTANCEINFO_FIELD_NUMBER: _builtins.int CREATETIMER_FIELD_NUMBER: _builtins.int EXTERNALEVENT_FIELD_NUMBER: _builtins.int + ACTIVITYRETRY_FIELD_NUMBER: _builtins.int + CHILDWORKFLOWRETRY_FIELD_NUMBER: _builtins.int name: _builtins.str @_builtins.property def fireAt(self) -> _timestamp_pb2.Timestamp: ... @@ -373,6 +413,10 @@ class TimerCreatedEvent(_message.Message): def createTimer(self) -> Global___TimerOriginCreateTimer: ... @_builtins.property def externalEvent(self) -> Global___TimerOriginExternalEvent: ... + @_builtins.property + def activityRetry(self) -> Global___TimerOriginActivityRetry: ... + @_builtins.property + def childWorkflowRetry(self) -> Global___TimerOriginChildWorkflowRetry: ... def __init__( self, *, @@ -381,16 +425,18 @@ class TimerCreatedEvent(_message.Message): rerunParentInstanceInfo: _orchestration_pb2.RerunParentInstanceInfo | None = ..., createTimer: Global___TimerOriginCreateTimer | None = ..., externalEvent: Global___TimerOriginExternalEvent | None = ..., + activityRetry: Global___TimerOriginActivityRetry | None = ..., + childWorkflowRetry: Global___TimerOriginChildWorkflowRetry | None = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "_rerunParentInstanceInfo", b"_rerunParentInstanceInfo", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin", "rerunParentInstanceInfo", b"rerunParentInstanceInfo"] # noqa: Y015 + _HasFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "_rerunParentInstanceInfo", b"_rerunParentInstanceInfo", "activityRetry", b"activityRetry", "childWorkflowRetry", b"childWorkflowRetry", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin", "rerunParentInstanceInfo", b"rerunParentInstanceInfo"] # noqa: Y015 def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "_rerunParentInstanceInfo", b"_rerunParentInstanceInfo", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin", "rerunParentInstanceInfo", b"rerunParentInstanceInfo"] # noqa: Y015 + _ClearFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "_rerunParentInstanceInfo", b"_rerunParentInstanceInfo", "activityRetry", b"activityRetry", "childWorkflowRetry", b"childWorkflowRetry", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin", "rerunParentInstanceInfo", b"rerunParentInstanceInfo"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... _WhichOneofReturnType__name: _TypeAlias = _typing.Literal["name"] # noqa: Y015 _WhichOneofArgType__name: _TypeAlias = _typing.Literal["_name", b"_name"] # noqa: Y015 _WhichOneofReturnType__rerunParentInstanceInfo: _TypeAlias = _typing.Literal["rerunParentInstanceInfo"] # noqa: Y015 _WhichOneofArgType__rerunParentInstanceInfo: _TypeAlias = _typing.Literal["_rerunParentInstanceInfo", b"_rerunParentInstanceInfo"] # noqa: Y015 - _WhichOneofReturnType_origin: _TypeAlias = _typing.Literal["createTimer", "externalEvent"] # noqa: Y015 + _WhichOneofReturnType_origin: _TypeAlias = _typing.Literal["createTimer", "externalEvent", "activityRetry", "childWorkflowRetry"] # noqa: Y015 _WhichOneofArgType_origin: _TypeAlias = _typing.Literal["origin", b"origin"] # noqa: Y015 @_typing.overload def WhichOneof(self, oneof_group: _WhichOneofArgType__name) -> _WhichOneofReturnType__name | None: ... diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py index d63efa77..718bb76c 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py @@ -28,7 +28,7 @@ from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1aorchestrator_actions.proto\x1a\x13orchestration.proto\x1a\x14history_events.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\xc4\x01\n\x12ScheduleTaskAction\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12 \n\x06router\x18\x04 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x12\x17\n\x0ftaskExecutionId\x18\x05 \x01(\tB\t\n\x07_router\"\xc6\x01\n\x19\x43reateChildWorkflowAction\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12 \n\x06router\x18\x05 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x42\t\n\x07_router\"\xc9\x01\n\x11\x43reateTimerAction\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12.\n\x0b\x63reateTimer\x18\x03 \x01(\x0b\x32\x17.TimerOriginCreateTimerH\x00\x12\x32\n\rexternalEvent\x18\x04 \x01(\x0b\x32\x19.TimerOriginExternalEventH\x00\x42\x08\n\x06originB\x07\n\x05_name\"p\n\x0fSendEventAction\x12#\n\x08instance\x18\x01 \x01(\x0b\x32\x11.WorkflowInstance\x12\x0c\n\x04name\x18\x02 \x01(\t\x12*\n\x04\x64\x61ta\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\xaa\x02\n\x16\x43ompleteWorkflowAction\x12,\n\x0eworkflowStatus\x18\x01 \x01(\x0e\x32\x14.OrchestrationStatus\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12-\n\x07\x64\x65tails\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x30\n\nnewVersion\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12&\n\x0f\x63\x61rryoverEvents\x18\x05 \x03(\x0b\x32\r.HistoryEvent\x12+\n\x0e\x66\x61ilureDetails\x18\x06 \x01(\x0b\x32\x13.TaskFailureDetails\"l\n\x17TerminateWorkflowAction\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06reason\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x0f\n\x07recurse\x18\x03 \x01(\x08\"#\n!WorkflowVersionNotAvailableAction\"\xd6\x03\n\x0eWorkflowAction\x12\n\n\x02id\x18\x01 \x01(\x05\x12+\n\x0cscheduleTask\x18\x02 \x01(\x0b\x32\x13.ScheduleTaskActionH\x00\x12\x39\n\x13\x63reateChildWorkflow\x18\x03 \x01(\x0b\x32\x1a.CreateChildWorkflowActionH\x00\x12)\n\x0b\x63reateTimer\x18\x04 \x01(\x0b\x32\x12.CreateTimerActionH\x00\x12%\n\tsendEvent\x18\x05 \x01(\x0b\x32\x10.SendEventActionH\x00\x12\x33\n\x10\x63ompleteWorkflow\x18\x06 \x01(\x0b\x32\x17.CompleteWorkflowActionH\x00\x12\x35\n\x11terminateWorkflow\x18\x07 \x01(\x0b\x32\x18.TerminateWorkflowActionH\x00\x12I\n\x1bworkflowVersionNotAvailable\x18\n \x01(\x0b\x32\".WorkflowVersionNotAvailableActionH\x00\x12 \n\x06router\x18\t \x01(\x0b\x32\x0b.TaskRouterH\x01\x88\x01\x01\x42\x14\n\x12workflowActionTypeB\t\n\x07_routerJ\x04\x08\x08\x10\tBV\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1aorchestrator_actions.proto\x1a\x13orchestration.proto\x1a\x14history_events.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\xc4\x01\n\x12ScheduleTaskAction\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12 \n\x06router\x18\x04 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x12\x17\n\x0ftaskExecutionId\x18\x05 \x01(\tB\t\n\x07_router\"\xc6\x01\n\x19\x43reateChildWorkflowAction\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12 \n\x06router\x18\x05 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x42\t\n\x07_router\"\xbb\x02\n\x11\x43reateTimerAction\x12*\n\x06\x66ireAt\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12.\n\x0b\x63reateTimer\x18\x03 \x01(\x0b\x32\x17.TimerOriginCreateTimerH\x00\x12\x32\n\rexternalEvent\x18\x04 \x01(\x0b\x32\x19.TimerOriginExternalEventH\x00\x12\x32\n\ractivityRetry\x18\x05 \x01(\x0b\x32\x19.TimerOriginActivityRetryH\x00\x12<\n\x12\x63hildWorkflowRetry\x18\x06 \x01(\x0b\x32\x1e.TimerOriginChildWorkflowRetryH\x00\x42\x08\n\x06originB\x07\n\x05_name\"p\n\x0fSendEventAction\x12#\n\x08instance\x18\x01 \x01(\x0b\x32\x11.WorkflowInstance\x12\x0c\n\x04name\x18\x02 \x01(\t\x12*\n\x04\x64\x61ta\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\xaa\x02\n\x16\x43ompleteWorkflowAction\x12,\n\x0eworkflowStatus\x18\x01 \x01(\x0e\x32\x14.OrchestrationStatus\x12,\n\x06result\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12-\n\x07\x64\x65tails\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x30\n\nnewVersion\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12&\n\x0f\x63\x61rryoverEvents\x18\x05 \x03(\x0b\x32\r.HistoryEvent\x12+\n\x0e\x66\x61ilureDetails\x18\x06 \x01(\x0b\x32\x13.TaskFailureDetails\"l\n\x17TerminateWorkflowAction\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06reason\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x0f\n\x07recurse\x18\x03 \x01(\x08\"#\n!WorkflowVersionNotAvailableAction\"\xd6\x03\n\x0eWorkflowAction\x12\n\n\x02id\x18\x01 \x01(\x05\x12+\n\x0cscheduleTask\x18\x02 \x01(\x0b\x32\x13.ScheduleTaskActionH\x00\x12\x39\n\x13\x63reateChildWorkflow\x18\x03 \x01(\x0b\x32\x1a.CreateChildWorkflowActionH\x00\x12)\n\x0b\x63reateTimer\x18\x04 \x01(\x0b\x32\x12.CreateTimerActionH\x00\x12%\n\tsendEvent\x18\x05 \x01(\x0b\x32\x10.SendEventActionH\x00\x12\x33\n\x10\x63ompleteWorkflow\x18\x06 \x01(\x0b\x32\x17.CompleteWorkflowActionH\x00\x12\x35\n\x11terminateWorkflow\x18\x07 \x01(\x0b\x32\x18.TerminateWorkflowActionH\x00\x12I\n\x1bworkflowVersionNotAvailable\x18\n \x01(\x0b\x32\".WorkflowVersionNotAvailableActionH\x00\x12 \n\x06router\x18\t \x01(\x0b\x32\x0b.TaskRouterH\x01\x88\x01\x01\x42\x14\n\x12workflowActionTypeB\t\n\x07_routerJ\x04\x08\x08\x10\tBV\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -41,15 +41,15 @@ _globals['_CREATECHILDWORKFLOWACTION']._serialized_start=338 _globals['_CREATECHILDWORKFLOWACTION']._serialized_end=536 _globals['_CREATETIMERACTION']._serialized_start=539 - _globals['_CREATETIMERACTION']._serialized_end=740 - _globals['_SENDEVENTACTION']._serialized_start=742 - _globals['_SENDEVENTACTION']._serialized_end=854 - _globals['_COMPLETEWORKFLOWACTION']._serialized_start=857 - _globals['_COMPLETEWORKFLOWACTION']._serialized_end=1155 - _globals['_TERMINATEWORKFLOWACTION']._serialized_start=1157 - _globals['_TERMINATEWORKFLOWACTION']._serialized_end=1265 - _globals['_WORKFLOWVERSIONNOTAVAILABLEACTION']._serialized_start=1267 - _globals['_WORKFLOWVERSIONNOTAVAILABLEACTION']._serialized_end=1302 - _globals['_WORKFLOWACTION']._serialized_start=1305 - _globals['_WORKFLOWACTION']._serialized_end=1775 + _globals['_CREATETIMERACTION']._serialized_end=854 + _globals['_SENDEVENTACTION']._serialized_start=856 + _globals['_SENDEVENTACTION']._serialized_end=968 + _globals['_COMPLETEWORKFLOWACTION']._serialized_start=971 + _globals['_COMPLETEWORKFLOWACTION']._serialized_end=1269 + _globals['_TERMINATEWORKFLOWACTION']._serialized_start=1271 + _globals['_TERMINATEWORKFLOWACTION']._serialized_end=1379 + _globals['_WORKFLOWVERSIONNOTAVAILABLEACTION']._serialized_start=1381 + _globals['_WORKFLOWVERSIONNOTAVAILABLEACTION']._serialized_end=1416 + _globals['_WORKFLOWACTION']._serialized_start=1419 + _globals['_WORKFLOWACTION']._serialized_end=1889 # @@protoc_insertion_point(module_scope) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi index 1b754f78..8df2644c 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi @@ -104,6 +104,8 @@ class CreateTimerAction(_message.Message): NAME_FIELD_NUMBER: _builtins.int CREATETIMER_FIELD_NUMBER: _builtins.int EXTERNALEVENT_FIELD_NUMBER: _builtins.int + ACTIVITYRETRY_FIELD_NUMBER: _builtins.int + CHILDWORKFLOWRETRY_FIELD_NUMBER: _builtins.int name: _builtins.str @_builtins.property def fireAt(self) -> _timestamp_pb2.Timestamp: ... @@ -111,6 +113,10 @@ class CreateTimerAction(_message.Message): def createTimer(self) -> _history_events_pb2.TimerOriginCreateTimer: ... @_builtins.property def externalEvent(self) -> _history_events_pb2.TimerOriginExternalEvent: ... + @_builtins.property + def activityRetry(self) -> _history_events_pb2.TimerOriginActivityRetry: ... + @_builtins.property + def childWorkflowRetry(self) -> _history_events_pb2.TimerOriginChildWorkflowRetry: ... def __init__( self, *, @@ -118,14 +124,16 @@ class CreateTimerAction(_message.Message): name: _builtins.str | None = ..., createTimer: _history_events_pb2.TimerOriginCreateTimer | None = ..., externalEvent: _history_events_pb2.TimerOriginExternalEvent | None = ..., + activityRetry: _history_events_pb2.TimerOriginActivityRetry | None = ..., + childWorkflowRetry: _history_events_pb2.TimerOriginChildWorkflowRetry | None = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin"] # noqa: Y015 + _HasFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "activityRetry", b"activityRetry", "childWorkflowRetry", b"childWorkflowRetry", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin"] # noqa: Y015 def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin"] # noqa: Y015 + _ClearFieldArgType: _TypeAlias = _typing.Literal["_name", b"_name", "activityRetry", b"activityRetry", "childWorkflowRetry", b"childWorkflowRetry", "createTimer", b"createTimer", "externalEvent", b"externalEvent", "fireAt", b"fireAt", "name", b"name", "origin", b"origin"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... _WhichOneofReturnType__name: _TypeAlias = _typing.Literal["name"] # noqa: Y015 _WhichOneofArgType__name: _TypeAlias = _typing.Literal["_name", b"_name"] # noqa: Y015 - _WhichOneofReturnType_origin: _TypeAlias = _typing.Literal["createTimer", "externalEvent"] # noqa: Y015 + _WhichOneofReturnType_origin: _TypeAlias = _typing.Literal["createTimer", "externalEvent", "activityRetry", "childWorkflowRetry"] # noqa: Y015 _WhichOneofArgType_origin: _TypeAlias = _typing.Literal["origin", b"origin"] # noqa: Y015 @_typing.overload def WhichOneof(self, oneof_group: _WhichOneofArgType__name) -> _WhichOneofReturnType__name | None: ... diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py new file mode 100644 index 00000000..e04c4da0 --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/timer.py @@ -0,0 +1,142 @@ +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Timer-related helpers for the durabletask workflow SDK. + +This module keeps all timer / TimerOrigin concerns in one place, separate from +the generic action/event helpers in ``helpers.py``. +""" + +from datetime import datetime +from typing import Optional, Union + +import dapr.ext.workflow._durabletask.internal.protos as pb +from google.protobuf import timestamp_pb2 + +TimerOrigin = Union[ + pb.TimerOriginCreateTimer, + pb.TimerOriginExternalEvent, + pb.TimerOriginActivityRetry, + pb.TimerOriginChildWorkflowRetry, +] + +_ORIGIN_FIELD: dict[type, str] = { + pb.TimerOriginCreateTimer: 'createTimer', + pb.TimerOriginExternalEvent: 'externalEvent', + pb.TimerOriginActivityRetry: 'activityRetry', + pb.TimerOriginChildWorkflowRetry: 'childWorkflowRetry', +} + +# Sentinel fireAt used for "optional" TimerOriginExternalEvent timers that back +# an indefinite wait_for_external_event. The sentinel is +# 9999-12-31T23:59:59.999999999Z (nanosecond precision — Python's ``datetime`` +# is only microsecond-precision, so we build the Timestamp directly). +OPTIONAL_TIMER_FIRE_AT: timestamp_pb2.Timestamp = timestamp_pb2.Timestamp( + seconds=253402300799, nanos=999999999 +) + + +def _origin_kwargs(origin: Optional[TimerOrigin]) -> dict: + """Build keyword args that set the correct origin oneof field on a + ``CreateTimerAction`` / ``TimerCreatedEvent`` proto.""" + if origin is None: + return {} + return {_ORIGIN_FIELD[type(origin)]: origin} + + +def _to_timestamp( + fire_at: Union[datetime, timestamp_pb2.Timestamp], +) -> timestamp_pb2.Timestamp: + """Normalize a ``fire_at`` argument to a protobuf ``Timestamp``.""" + if isinstance(fire_at, timestamp_pb2.Timestamp): + return fire_at + ts = timestamp_pb2.Timestamp() + ts.FromDatetime(fire_at) + return ts + + +def _fire_at_matches_sentinel(ts: timestamp_pb2.Timestamp) -> bool: + """Return True iff ``ts`` exactly matches :data:`OPTIONAL_TIMER_FIRE_AT`. + + Both ``seconds`` and ``nanos`` are proto3 scalars that are always present + (default 0 when unset), so unconditional field access is safe. + """ + return ts.seconds == OPTIONAL_TIMER_FIRE_AT.seconds and ts.nanos == OPTIONAL_TIMER_FIRE_AT.nanos + + +def is_optional_timer_action(action: pb.WorkflowAction) -> bool: + """Returns True if the action is an optional TimerOriginExternalEvent timer + with the sentinel fireAt — i.e. created by an indefinite wait_for_external_event. + + Pre-patch histories (from prior SDK versions that didn't schedule a timer + for indefinite waits) won't carry a matching TimerCreatedEvent; the replay + logic uses this check to drop the optional action and shift sequence ids. + """ + if not action.HasField('createTimer'): + return False + timer = action.createTimer + if timer.WhichOneof('origin') != 'externalEvent': + return False + return _fire_at_matches_sentinel(timer.fireAt) + + +def is_optional_timer_event(event: pb.HistoryEvent) -> bool: + """Returns True if a TimerCreatedEvent is the optional sentinel timer. + + For replay compatibility, treat a timerCreated event with the sentinel + fireAt as optional even if the proto3 ``origin`` oneof is unset (e.g. when + reading histories emitted by older sidecars that didn't populate it). When + ``origin`` *is* populated, it must match TimerOriginExternalEvent. + """ + if not event.HasField('timerCreated'): + return False + timer = event.timerCreated + if not _fire_at_matches_sentinel(timer.fireAt): + return False + origin = timer.WhichOneof('origin') + return origin in (None, 'externalEvent') + + +def new_create_timer_action( + timer_id: int, + fire_at: Union[datetime, timestamp_pb2.Timestamp], + origin: Optional[TimerOrigin] = None, +) -> pb.WorkflowAction: + ts = _to_timestamp(fire_at) + return pb.WorkflowAction( + id=timer_id, + createTimer=pb.CreateTimerAction(fireAt=ts, **_origin_kwargs(origin)), + ) + + +def new_timer_created_event( + timer_id: int, + fire_at: Union[datetime, timestamp_pb2.Timestamp], + origin: Optional[TimerOrigin] = None, +) -> pb.HistoryEvent: + ts = _to_timestamp(fire_at) + return pb.HistoryEvent( + eventId=timer_id, + timestamp=timestamp_pb2.Timestamp(), + timerCreated=pb.TimerCreatedEvent(fireAt=ts, **_origin_kwargs(origin)), + ) + + +def new_timer_fired_event( + timer_id: int, + fire_at: Union[datetime, timestamp_pb2.Timestamp], +) -> pb.HistoryEvent: + ts = _to_timestamp(fire_at) + return pb.HistoryEvent( + eventId=-1, + timestamp=timestamp_pb2.Timestamp(), + timerFired=pb.TimerFiredEvent(fireAt=ts, timerId=timer_id), + ) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index e52831a0..82bf062a 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -166,21 +166,35 @@ def call_sub_orchestrator( """ pass - # TODO: Add a timeout parameter, which allows the task to be canceled if the event is - # not received within the specified timeout. This requires support for task cancellation. @abstractmethod - def wait_for_external_event(self, name: str) -> Task: + def wait_for_external_event( + self, name: str, *, timeout: Optional[Union[datetime, timedelta]] = None + ) -> Task: """Wait asynchronously for an event to be raised with the name `name`. Parameters ---------- name : str The event name of the event that the task is waiting for. + timeout : datetime | timedelta | None + Controls how long to wait for the event. Accepts only ``datetime`` + or ``timedelta`` — a plain numeric ``0`` is **not** a valid value + (use ``timedelta(0)`` explicitly). Three shapes: + + * ``None`` (default) or a *negative* ``timedelta`` — wait indefinitely. + An optional sentinel timer is scheduled internally for runtime + tracking, but ``TimeoutError`` is never raised on its own. + * ``timedelta(0)`` — do not wait at all. The returned task fails + immediately with ``TimeoutError``. + * A future ``datetime`` or a positive ``timedelta`` — wait until + that deadline / for that duration; ``TimeoutError`` is raised if + the event has not been received in time. Returns ------- Task[TOutput] - A Durable Task that completes when the event is received. + A Durable Task that completes when the event is received or fails + with ``TimeoutError`` if the timeout fires first. """ pass @@ -284,7 +298,7 @@ class Task(ABC, Generic[T]): """Abstract base class for asynchronous tasks in a durable orchestration.""" _result: T - _exception: Optional[TaskFailedError] + _exception: Optional[Exception] _parent: Optional[CompositeTask[T]] def __init__(self) -> None: @@ -311,7 +325,7 @@ def get_result(self) -> T: raise self._exception return self._result - def get_exception(self) -> TaskFailedError: + def get_exception(self) -> Exception: """Returns the exception that caused the task to fail.""" if self._exception is None: raise ValueError('The task has not failed.') @@ -400,6 +414,23 @@ def fail(self, message: str, details: pb.TaskFailureDetails): if self._parent is not None: self._parent.on_child_completed(self) + def cancel(self, exc: Exception) -> None: + """Mark this task as completed with the given exception. + + Used for non-runtime failures such as a zero-timeout + wait_for_external_event being canceled before any history event arrives. + + Idempotent: a noop if the task has already completed (success or + failure), which matches the common semantics of ``cancel()`` in + asyncio / threading APIs. + """ + if self._is_complete: + return + self._exception = exc + self._is_complete = True + if self._parent is not None: + self._parent.on_child_completed(self) + class RetryableTask(CompletableTask[T]): """A task that can be retried according to a retry policy.""" @@ -497,6 +528,54 @@ def on_child_completed(self, task: Task): self._parent.on_child_completed(self) +class ExternalEventWithTimeoutTask(CompositeTask[T]): + """A task that waits for an external event with a timeout. + + Completes with the event data if the event arrives first, or raises + ``TimeoutError`` if the timer fires first. + + When the timer wins, the optional ``on_timeout`` callback is invoked so the + caller can unregister the stale event task from ``_pending_events``, ensuring + that a late ``eventRaised`` for the same name is buffered rather than consumed + by the now-obsolete waiter. + """ + + def __init__( + self, + event_task: CompletableTask, + timer_task: TimerTask, + event_name: str, + timeout: Optional[Union[datetime, timedelta]] = None, + on_timeout: Optional[Callable[[], None]] = None, + ): + self._event_task = event_task + self._timer_task = timer_task + self._event_name = event_name + self._timeout = timeout + self._on_timeout = on_timeout + super().__init__([event_task, timer_task]) + + def on_child_completed(self, completed_task: Task): + if self.is_complete: + return + if completed_task is self._event_task: + if completed_task.is_failed: + self._exception = completed_task.get_exception() + else: + self._result = completed_task.get_result() + self._is_complete = True + elif completed_task is self._timer_task: + if self._on_timeout is not None: + self._on_timeout() + after = f' after {self._timeout!r}' if self._timeout is not None else '' + self._exception = TimeoutError( + f'Timed out{after} waiting for external event {self._event_name!r}' + ) + self._is_complete = True + if self._is_complete and self._parent is not None: + self._parent.on_child_completed(self) + + def when_all(tasks: list[Task[T]]) -> WhenAllTask[T]: """Returns a task that completes when all of the provided tasks complete or when one of the tasks fail.""" return WhenAllTask(tasks) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index 7de121a1..b79f7eb8 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -30,7 +30,7 @@ import grpc from dapr.ext.workflow._durabletask import deterministic, task from dapr.ext.workflow._durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl -from google.protobuf import empty_pb2 +from google.protobuf import empty_pb2, timestamp_pb2 TInput = TypeVar('TInput') TOutput = TypeVar('TOutput') @@ -1068,6 +1068,11 @@ def run(self, generator: Generator[task.Task, Any, Any]): task = next(generator) # this starts the generator # TODO: Check if the task is null? self._previous_task = task + # If the first yielded task is already complete (e.g. wait_for_external_event + # with timeout=0 returns an immediately-canceled task), drive the generator + # forward now so the orchestrator doesn't get stuck waiting for an event. + if self._previous_task is not None and self._previous_task.is_complete: + self.resume() def resume(self): if self._generator is None: @@ -1183,6 +1188,40 @@ def next_sequence_number(self) -> int: self._sequence_number += 1 return self._sequence_number + def _drop_optional_pending_at(self, action_id: int) -> bool: + """Drop the pending optional timer action at ``action_id`` and shift every + pending action and pending task with id greater than ``action_id`` down by + one, also decrementing the sequence counter. + + Returns True if the action at ``action_id`` was an optional timer and was + dropped; False otherwise (caller should fall through to the normal + non-determinism error path). + + This preserves sequence-id determinism when replaying a pre-patch history + where an indefinite wait_for_external_event did not reserve a sequence + number for its optional timer. + """ + action = self._pending_actions.get(action_id) + if action is None or not ph.is_optional_timer_action(action): + return False + + del self._pending_actions[action_id] + self._pending_tasks.pop(action_id, None) + + # Shift every id > action_id down by one. + higher_action_ids = sorted(k for k in self._pending_actions if k > action_id) + for old_id in higher_action_ids: + a = self._pending_actions.pop(old_id) + a.id = old_id - 1 + self._pending_actions[old_id - 1] = a + higher_task_ids = sorted(k for k in self._pending_tasks if k > action_id) + for old_id in higher_task_ids: + t = self._pending_tasks.pop(old_id) + self._pending_tasks[old_id - 1] = t + + self._sequence_number -= 1 + return True + @property def app_id(self) -> str: return self._app_id @@ -1216,17 +1255,21 @@ def set_custom_status(self, custom_status: str) -> None: self._encoded_custom_status = custom_status def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: - return self.create_timer_internal(fire_at) + return self.create_timer_internal( + fire_at, + origin=pb.TimerOriginCreateTimer(), + ) def create_timer_internal( self, - fire_at: Union[datetime, timedelta], + fire_at: Union[datetime, timedelta, timestamp_pb2.Timestamp], retryable_task: Optional[task.RetryableTask] = None, - ) -> task.Task: + origin: Optional[ph.TimerOrigin] = None, + ) -> task.TimerTask: id = self.next_sequence_number() if isinstance(fire_at, timedelta): fire_at = self.current_utc_datetime + fire_at - action = ph.new_create_timer_action(id, fire_at) + action = ph.new_create_timer_action(id, fire_at, origin=origin) self._pending_actions[id] = action timer_task: task.TimerTask = task.TimerTask() @@ -1345,7 +1388,12 @@ def call_activity_function_helper( ) self._pending_tasks[id] = fn_task - def wait_for_external_event(self, name: str) -> task.Task: + def wait_for_external_event( + self, + name: str, + *, + timeout: Optional[Union[datetime, timedelta]] = None, + ) -> task.Task: # Check to see if this event has already been received, in which case we # can return it immediately. Otherwise, record out intent to receive an # event with the given name so that we can resume the generator when it @@ -1365,7 +1413,64 @@ def wait_for_external_event(self, name: str) -> task.Task: task_list = [] self._pending_events[event_name] = task_list task_list.append(external_event_task) - return external_event_task + + if external_event_task.is_complete: + return external_event_task + + # Three timeout shapes: + # - timedelta(0): immediately-canceled task (no timer) + # - positive: normal TimerOriginExternalEvent timer + # - None or negative: indefinite wait — an OPTIONAL timer is scheduled + # with a sentinel fireAt. See ph.OPTIONAL_TIMER_FIRE_AT + # and the replay-shift logic for how older histories + # (which didn't schedule the timer) are tolerated. + if isinstance(timeout, timedelta) and timeout == timedelta(0): + # Remove the task we just registered in _pending_events so the event + # doesn't try to complete it later. + pending = self._pending_events.get(event_name) + if pending and external_event_task in pending: + pending.remove(external_event_task) + if not pending: + del self._pending_events[event_name] + external_event_task.cancel( + TimeoutError( + f'Wait for external event {name!r} canceled immediately due to zero timeout' + ) + ) + return external_event_task + + is_indefinite = timeout is None or ( + isinstance(timeout, timedelta) and timeout < timedelta(0) + ) + fire_at: Union[datetime, timedelta, timestamp_pb2.Timestamp] + if is_indefinite: + fire_at = ph.OPTIONAL_TIMER_FIRE_AT + else: + # Narrowed by ``is_indefinite``: timeout is a positive timedelta or a datetime here. + assert timeout is not None + fire_at = timeout + + timer_task = self.create_timer_internal( + fire_at, + origin=pb.TimerOriginExternalEvent(name=name), + ) + + def _unregister_stale_event_task() -> None: + """Remove the event task from _pending_events so late eventRaised + messages are buffered instead of consumed by this timed-out waiter.""" + pending = self._pending_events.get(event_name) + if pending and external_event_task in pending: + pending.remove(external_event_task) + if not pending: + del self._pending_events[event_name] + + return task.ExternalEventWithTimeoutTask( + external_event_task, + timer_task, + event_name=name, + timeout=None if is_indefinite else timeout, + on_timeout=_unregister_stale_event_task, + ) def continue_as_new(self, new_input, *, save_events: bool = False) -> None: if self._is_complete: @@ -1543,6 +1648,29 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven # This history event confirms that the timer was successfully scheduled. # Remove the timerCreated event from the pending action list so we don't schedule it again. timer_id = event.eventId + # Asymmetric case: pending is an optional timer but the incoming + # TimerCreated is a different (non-optional) timer — e.g., a user + # CreateTimer emitted by pre-patch code right after an indefinite + # wait_for_external_event. Drop the optional and shift so the + # real timer matches at the same id. + pending = ctx._pending_actions.get(timer_id) + if ( + pending is not None + and ph.is_optional_timer_action(pending) + and not ph.is_optional_timer_event(event) + ): + ctx._drop_optional_pending_at(timer_id) + pending = ctx._pending_actions.get(timer_id) + # Reverse asymmetric: the incoming TimerCreated is an optional timer + # from an older code version, but the current code has a different + # action at this id (or none at all). This happens when a patch adds + # new actions before the wait_for_external_event that originally + # produced the timer. Silently drop the stale optional event; the + # pending action stays in place to match its own future event. + if ph.is_optional_timer_event(event) and ( + pending is None or not ph.is_optional_timer_action(pending) + ): + return action = ctx._pending_actions.pop(timer_id, None) if not action: raise _get_non_determinism_error(timer_id, task.get_name(ctx.create_timer)) @@ -1580,6 +1708,13 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven # This history event confirms that the activity execution was successfully scheduled. # Remove the taskScheduled event from the pending action list so we don't schedule it again. task_id = event.eventId + # If the pending action at this id is an optional timer from an + # indefinite wait_for_external_event that wasn't present in the + # pre-patch history, drop it and shift so this schedule matches. + if task_id in ctx._pending_actions and ph.is_optional_timer_action( + ctx._pending_actions[task_id] + ): + ctx._drop_optional_pending_at(task_id) action = ctx._pending_actions.pop(task_id, None) activity_task = ctx._pending_tasks.get(task_id, None) if not action: @@ -1642,7 +1777,13 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven ctx.resume() else: activity_task.increment_attempt_count() - ctx.create_timer_internal(next_delay, activity_task) + ctx.create_timer_internal( + next_delay, + activity_task, + origin=pb.TimerOriginActivityRetry( + taskExecutionId=activity_task._task_execution_id, + ), + ) elif isinstance(activity_task, task.CompletableTask): activity_task.fail( f'{ctx.instance_id}: Activity task #{task_id} failed: {event.taskFailed.failureDetails.errorMessage}', @@ -1655,6 +1796,11 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven # This history event confirms that the sub-orchestration execution was successfully scheduled. # Remove the childWorkflowInstanceCreated event from the pending action list so we don't schedule it again. task_id = event.eventId + # If the pending action at this id is an optional timer, drop+shift. + if task_id in ctx._pending_actions and ph.is_optional_timer_action( + ctx._pending_actions[task_id] + ): + ctx._drop_optional_pending_at(task_id) action = ctx._pending_actions.pop(task_id, None) if not action: raise _get_non_determinism_error( @@ -1717,7 +1863,13 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven ctx.resume() else: sub_orch_task.increment_attempt_count() - ctx.create_timer_internal(next_delay, sub_orch_task) + ctx.create_timer_internal( + next_delay, + sub_orch_task, + origin=pb.TimerOriginChildWorkflowRetry( + instanceId=sub_orch_task._instance_id, + ), + ) elif isinstance(sub_orch_task, task.CompletableTask): sub_orch_task.fail( f'Sub-orchestration task #{task_id} failed: {failedEvent.failureDetails.errorMessage}', diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index f0c98a70..1f9f6417 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -145,9 +145,14 @@ def wf(ctx: task.OrchestrationContext, inp: TInput): wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj, app_id=app_id ) - def wait_for_external_event(self, name: str) -> task.Task: + def wait_for_external_event( + self, + name: str, + *, + timeout: Optional[Union[datetime, timedelta]] = None, + ) -> task.Task: self._logger.debug(f'{self.instance_id}: Waiting for external event {name}') - return self.__obj.wait_for_external_event(name) + return self.__obj.wait_for_external_event(name, timeout=timeout) def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None: self._logger.debug(f'{self.instance_id}: Continuing as new') diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py index bfe5ce98..af31e84d 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py @@ -162,18 +162,37 @@ def call_child_workflow( pass @abstractmethod - def wait_for_external_event(self, name: str) -> task.Task: + def wait_for_external_event( + self, + name: str, + *, + timeout: Optional[Union[datetime, timedelta]] = None, + ) -> task.Task: """Wait asynchronously for an event to be raised with the name `name`. Parameters ---------- name : str The event name of the event that the task is waiting for. + timeout : datetime | timedelta | None + Controls how long to wait for the event. Accepts only ``datetime`` + or ``timedelta`` — a plain numeric ``0`` is **not** a valid value + (use ``timedelta(0)`` explicitly). Three shapes: + + * ``None`` (default) or a *negative* ``timedelta`` — wait indefinitely. + An optional sentinel timer is scheduled internally for runtime + tracking, but ``TimeoutError`` is never raised on its own. + * ``timedelta(0)`` — do not wait at all. The returned task fails + immediately with ``TimeoutError``. + * A future ``datetime`` or a positive ``timedelta`` — wait until + that deadline / for that duration; ``TimeoutError`` is raised if + the event has not been received in time. Returns ------- Task[TOutput] - A Durable Task that completes when the event is received. + A Durable Task that completes when the event is received or fails + with ``TimeoutError`` if the timeout fires first. """ pass diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 37b44d15..95315502 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -912,16 +912,23 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID), ] - # Execute the orchestration until it is waiting for an external event. The result - # should be an empty list of actions because the orchestration didn't schedule any work. + # Execute the orchestration until it is waiting for an external event. An + # optional TimerOriginExternalEvent timer is scheduled (sentinel fireAt). executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 0 + assert len(actions) == 1 + assert actions[0].HasField('createTimer') - # Now send an external event to the orchestration and execute it again. This time - # the orchestration should complete. - old_events = new_events + # Post-patch replay: history contains the matching optional TimerCreated. The + # orchestration completes normally on event arrival. + old_events = new_events + [ + helpers.new_timer_created_event( + 1, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='my_event'), + ) + ] new_events = [helpers.new_event_raised_event('my_event', encoded_input='42')] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) @@ -982,6 +989,13 @@ def orchestrator(ctx: task.OrchestrationContext, _): old_events = [ helpers.new_workflow_started_event(), helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID), + # The external event wait creates an optional TimerOriginExternalEvent timer + # with the sentinel fireAt. Post-patch history contains the matching event. + helpers.new_timer_created_event( + 1, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='my_event'), + ), ] new_events = [ helpers.new_suspend_event(), @@ -1855,6 +1869,791 @@ def parent(ctx: task.OrchestrationContext, _): ) +def test_create_timer_sets_create_timer_origin(): + """Tests that create_timer sets TimerOriginCreateTimer on the CreateTimerAction.""" + + def delay_orchestrator(ctx: task.OrchestrationContext, _): + due_time = ctx.current_utc_datetime + timedelta(seconds=5) + yield ctx.create_timer(due_time) + return 'done' + + registry = worker._Registry() + name = registry.add_orchestrator(delay_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + new_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'createTimer' + + +def test_wait_for_external_event_timeout_sets_external_event_origin(): + """Tests that wait_for_external_event with timeout creates a timer with + TimerOriginExternalEvent origin containing the event name.""" + + def timeout_orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=30)) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(timeout_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + new_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + # The only action should be the timer (the external event wait doesn't produce an action) + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'externalEvent' + assert timer_action.externalEvent.name == 'myEvent' + + +def test_wait_for_external_event_timeout_fires_raises_timeout_error(): + """Tests that when the timeout fires before the event arrives, the task + raises a TimeoutError.""" + + def timeout_orchestrator(ctx: task.OrchestrationContext, _): + try: + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=5)) + return f'got: {result}' + except TimeoutError: + return 'timed out' + + registry = worker._Registry() + name = registry.add_orchestrator(timeout_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + fire_at = start_time + timedelta(seconds=5) + + # First execution: creates the timer + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event(1, fire_at), + ] + # Timer fires before event arrives + new_events = [ + helpers.new_timer_fired_event(1, fire_at), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('timed out') + + +def test_wait_for_external_event_with_timeout_event_arrives_first(): + """Tests that when the event arrives before the timeout, the task completes + with the event data.""" + + def timeout_orchestrator(ctx: task.OrchestrationContext, _): + try: + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=30)) + return f'got: {result}' + except TimeoutError: + return 'timed out' + + registry = worker._Registry() + name = registry.add_orchestrator(timeout_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + + # First execution: creates the timer (id=1) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event(1, start_time + timedelta(seconds=30)), + ] + # Event arrives before the timer fires + new_events = [ + helpers.new_event_raised_event('myEvent', json.dumps('hello')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('got: hello') + + +def test_wait_for_external_event_timeout_cleans_up_pending_event(): + """When the timeout timer fires first, the stale event task is unregistered + from _pending_events so that a subsequent wait_for_external_event for the + same name can observe a later event rather than having it consumed by the + timed-out waiter.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + try: + yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=5)) + except TimeoutError: + pass + # Second wait for the same event name — must pick up the late event. + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(seconds=60)) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + fire_at = start_time + timedelta(seconds=5) + + # First wait creates timer at id=1, fires. Second wait creates timer at id=2. + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event(1, fire_at), + helpers.new_timer_fired_event(1, fire_at), + helpers.new_timer_created_event(2, start_time + timedelta(seconds=60)), + ] + # The late event arrives during the second wait. + new_events = [ + helpers.new_event_raised_event('myEvent', json.dumps('late hello')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('late hello') + + +def test_wait_for_external_event_indefinite_emits_optional_timer(): + """WaitForExternalEvent with no timeout (or a negative timeout) emits an + optional timer whose fireAt is the exact sentinel 9999-12-31T23:59:59.999999999Z.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event('myEvent') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + new_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'externalEvent' + assert timer_action.externalEvent.name == 'myEvent' + # Exact sentinel match — bit-for-bit, including nanoseconds. + assert timer_action.fireAt.seconds == helpers.OPTIONAL_TIMER_FIRE_AT.seconds + assert timer_action.fireAt.nanos == helpers.OPTIONAL_TIMER_FIRE_AT.nanos + assert helpers.is_optional_timer_action(actions[0]) + + +def test_wait_for_external_event_zero_timeout_emits_no_timer(): + """WaitForExternalEvent with timeout=0 emits no timer and the returned task is + immediately canceled (fails with TimeoutError).""" + + def orchestrator(ctx: task.OrchestrationContext, _): + try: + result = yield ctx.wait_for_external_event('myEvent', timeout=timedelta(0)) + return f'got: {result}' + except TimeoutError: + return 'canceled' + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + new_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + # Only the completion action is emitted — no CreateTimerAction. + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('canceled') + + +def test_post_patch_replay_optional_timer_matches_history(): + """A post-patch history containing the optional TimerCreated event replays + through the normal match path without shifting.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event('myEvent') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event( + 1, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='myEvent'), + ), + ] + new_events = [helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('ok'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('ok') + + +def test_post_patch_replay_optional_timer_with_unset_origin(): + """An older sidecar may emit TimerCreatedEvent without populating the proto3 + ``origin`` oneof. The sentinel fireAt alone must still classify the event as + optional so the replay matches our pending optional timer cleanly.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event('myEvent') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + # Sentinel fireAt but no origin set. + helpers.new_timer_created_event(1, helpers.OPTIONAL_TIMER_FIRE_AT), + ] + new_events = [helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('ok'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('ok') + + +def test_patch_adds_activity_before_existing_wait(): + """Reproduces the versioning.py test5/test6 scenario. + + An in-flight orchestration had a wait_for_external_event that emitted an + optional TimerCreated event into history. Later, the code was patched to add + an ``is_patched`` check + activity call *before* the wait. On replay, the + new orchestration code emits a ScheduleTask at the id that the optional + TimerCreated occupies in history. The runtime must silently skip the stale + optional TimerCreated event rather than raising a non-determinism error.""" + + def dummy_activity(ctx, input): + return f'did: {input}' + + def patched_orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.call_activity(dummy_activity, input='start') + # NEW: is_patched branch added before the existing wait. Schedules an + # activity at the id previously occupied by the optional timer. + if ctx.is_patched('patch1'): + yield ctx.call_activity(dummy_activity, input='patch1 is patched') + else: + yield ctx.call_activity(dummy_activity, input='patch1 is not patched') + result = yield ctx.wait_for_external_event('evt') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(patched_orchestrator) + registry.add_activity(dummy_activity) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + # Pre-patch history: start activity at id=1, optional timer at id=2. + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + helpers.new_task_completed_event(1, json.dumps('did: start')), + helpers.new_timer_created_event( + 2, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='evt'), + ), + ] + # Event arrives (triggers replay with the new code path). + new_events = [helpers.new_event_raised_event('evt', encoded_input=json.dumps('ok'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # Replay must NOT fail. It must emit the new patch1-not-patched activity as + # a pending action (id=2 — the slot the stale optional timer is skipped from). + # is_patched returns False during replay because the patch is not recorded in + # the original history's workflowStarted patches. + assert len(actions) == 1 + assert actions[0].HasField('scheduleTask') + assert actions[0].scheduleTask.input.value == json.dumps('patch1 is not patched') + + +def test_pre_patch_replay_indefinite_wait_then_activity(): + """A pre-patch history has the activity scheduled at id=1 (no reserved id for + the indefinite wait). The replay must drop the optional timer, shift the + activity down to id=1, and complete cleanly.""" + + def dummy_activity(ctx, _): + return 'activity result' + + def orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.wait_for_external_event('myEvent') + result = yield ctx.call_activity(dummy_activity) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + registry.add_activity(dummy_activity) + + # Pre-patch history: no timerCreated for the wait, activity scheduled at id=1 + # (which would have been id=2 under post-patch numbering). + start_time = datetime(2020, 1, 1, 12, 0, 0) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('go')), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + ] + new_events = [helpers.new_task_completed_event(1, json.dumps('activity result'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # A single completion action — no phantom createTimer leaks. + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('activity result') + + +def test_pre_patch_replay_indefinite_wait_then_child_workflow(): + """A pre-patch history with a child workflow scheduled after an indefinite + wait — the shift logic must work for childWorkflowInstanceCreated too.""" + + def child_orchestrator(ctx: task.OrchestrationContext, _): + return 'child result' + + def parent_orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.wait_for_external_event('myEvent') + result = yield ctx.call_sub_orchestrator('child_orchestrator') + return result + + registry = worker._Registry() + name = registry.add_orchestrator(parent_orchestrator) + registry.add_orchestrator(child_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + # Pre-patch history: child workflow scheduled at id=1 (no reserved id for wait). + child_instance_id = f'{TEST_INSTANCE_ID}:0001' + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('go')), + helpers.new_child_workflow_created_event(1, 'child_orchestrator', child_instance_id), + ] + new_events = [helpers.new_child_workflow_completed_event(1, json.dumps('child result'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('child result') + + +def test_pre_patch_replay_indefinite_wait_then_user_create_timer(): + """A pre-patch history has a user CreateTimer right after an indefinite wait. + Both the pending action and the incoming event are CreateTimer — the SDK must + distinguish optional (externalEvent + sentinel) from non-optional and shift.""" + + def orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.wait_for_external_event('myEvent') + yield ctx.create_timer(timedelta(seconds=5)) + return 'done' + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + # Pre-patch history: a non-optional (user) TimerCreated at id=1. The + # post-patch code would have emitted the optional timer at id=1 and the user + # timer at id=2; the shift must drop the optional and match the user timer. + user_fire_at = start_time + timedelta(seconds=5) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_event_raised_event('myEvent', encoded_input=json.dumps('go')), + helpers.new_timer_created_event( + 1, + user_fire_at, + origin=pb.TimerOriginCreateTimer(), + ), + ] + new_events = [helpers.new_timer_fired_event(1, user_fire_at)] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('done') + + +def test_stale_optional_timer_event_does_not_match_user_timer(): + """A stale optional TimerCreated event in history must not be treated as + confirmation of a non-optional user CreateTimer that now occupies the same id. + + Scenario: older code had an indefinite wait_for_external_event (optional timer + at id=1). A patch replaced the wait with a user CreateTimer at the same id. + The stale optional TimerCreated must be dropped; the user timer must remain + pending and match its own (non-optional) TimerCreated on a future replay.""" + + def patched_orchestrator(ctx: task.OrchestrationContext, _): + # New code: user timer at id=1 (replaces the old indefinite wait). + yield ctx.create_timer(timedelta(seconds=10)) + return 'done' + + registry = worker._Registry() + name = registry.add_orchestrator(patched_orchestrator) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + user_fire_at = start_time + timedelta(seconds=10) + # History from the old code: optional timer at id=1 (stale). + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_timer_created_event( + 1, + helpers.OPTIONAL_TIMER_FIRE_AT, + origin=pb.TimerOriginExternalEvent(name='evt'), + ), + ] + # New events: the runtime confirms and fires the real user timer at id=1. + new_events = [ + helpers.new_timer_created_event( + 1, + user_fire_at, + origin=pb.TimerOriginCreateTimer(), + ), + helpers.new_timer_fired_event(1, user_fire_at), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # The stale optional event must have been dropped. The real user timer must + # have been confirmed and fired, completing the orchestration. + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('done') + + +def test_pre_patch_replay_two_indefinite_waits(): + """Two indefinite waits in sequence. Shifts must compose across multiple + optional timers.""" + + def dummy_activity(ctx, _): + return 'act result' + + def orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.wait_for_external_event('A') + yield ctx.call_activity(dummy_activity) + yield ctx.wait_for_external_event('B') + result = yield ctx.call_activity(dummy_activity) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + registry.add_activity(dummy_activity) + + start_time = datetime(2020, 1, 1, 12, 0, 0) + # Pre-patch numbering: first activity id=1, second activity id=2 (both waits + # consumed no sequence numbers). + activity_name = task.get_name(dummy_activity) + old_events = [ + helpers.new_workflow_started_event(start_time), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_event_raised_event('A', encoded_input=json.dumps('a')), + helpers.new_task_scheduled_event(1, activity_name), + helpers.new_task_completed_event(1, json.dumps('act result')), + helpers.new_event_raised_event('B', encoded_input=json.dumps('b')), + helpers.new_task_scheduled_event(2, activity_name), + ] + new_events = [helpers.new_task_completed_event(2, json.dumps('act result'))] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + complete_action = get_and_validate_single_complete_workflow_action(actions) + assert complete_action.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert complete_action.result.value == json.dumps('act result') + + +def test_activity_retry_timer_sets_activity_retry_origin(): + """Tests that retry timers for failed activities set TimerOriginActivityRetry + with the correct taskExecutionId.""" + + def dummy_activity(ctx, _): + raise ValueError('boom') + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_activity( + dummy_activity, + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + # Attempt 1: scheduleTask(id=1) fails + old_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + ] + new_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_task_failed_event(1, ValueError('boom')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + # The retry timer should have activityRetry origin + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'activityRetry' + assert timer_action.activityRetry.taskExecutionId != '' + + +def test_activity_retry_task_execution_id_stable_across_retries(): + """Tests that the taskExecutionId in TimerOriginActivityRetry is stable + across multiple retry attempts of the same logical activity call.""" + + def dummy_activity(ctx, _): + raise ValueError('boom') + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_activity( + dummy_activity, + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=4, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + # Attempt 1: scheduleTask(id=1) fails -> retry timer(id=2) + old_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + ] + new_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_task_failed_event(1, ValueError('boom')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + retry_timer_1 = actions[0].createTimer + first_task_execution_id = retry_timer_1.activityRetry.taskExecutionId + assert first_task_execution_id != '' + + # Timer fires -> scheduleTask(id=3), then fails -> retry timer(id=4) + old_events = old_events + new_events + current_timestamp = current_timestamp + timedelta(seconds=1) + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_timer_created_event(2, current_timestamp), + helpers.new_timer_fired_event(2, current_timestamp), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('scheduleTask') + + # Attempt 2 fails + old_events = old_events + new_events + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_task_scheduled_event(3, task.get_name(dummy_activity)), + helpers.new_task_failed_event(3, ValueError('boom')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + retry_timer_2 = actions[0].createTimer + second_task_execution_id = retry_timer_2.activityRetry.taskExecutionId + + # Both retry timers must carry the SAME taskExecutionId + assert second_task_execution_id == first_task_execution_id + + +def test_child_workflow_retry_timer_sets_child_workflow_retry_origin(): + """Tests that retry timers for failed child workflows set + TimerOriginChildWorkflowRetry with the correct instanceId.""" + + def child_orchestrator(ctx: task.OrchestrationContext, _): + raise ValueError('child failed') + + def parent_orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_sub_orchestrator( + 'child_orchestrator', + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(parent_orchestrator) + registry.add_orchestrator(child_orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + # First child created with id=1 -> instance_id = "abc123:0001" + expected_first_child_id = f'{TEST_INSTANCE_ID}:0001' + old_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_child_workflow_created_event(1, 'child_orchestrator', expected_first_child_id), + ] + new_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_child_workflow_failed_event(1, ValueError('child failed')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + timer_action = actions[0].createTimer + assert timer_action.WhichOneof('origin') == 'childWorkflowRetry' + assert timer_action.childWorkflowRetry.instanceId == expected_first_child_id + + +def test_child_workflow_retry_instance_id_always_points_to_first_child(): + """Tests that the instanceId in TimerOriginChildWorkflowRetry always + references the first child workflow's instance ID, even across multiple retries.""" + + def child_orchestrator(ctx: task.OrchestrationContext, _): + raise ValueError('child failed') + + def parent_orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_sub_orchestrator( + 'child_orchestrator', + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=4, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(parent_orchestrator) + registry.add_orchestrator(child_orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + # First child: sub-orch(id=1) -> instance_id = "abc123:0001" + expected_first_child_id = f'{TEST_INSTANCE_ID}:0001' + old_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_child_workflow_created_event(1, 'child_orchestrator', expected_first_child_id), + ] + new_events = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_child_workflow_failed_event(1, ValueError('child failed')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + retry_timer_1 = actions[0].createTimer + assert retry_timer_1.childWorkflowRetry.instanceId == expected_first_child_id + + # Timer fires -> new child sub-orch(id=3) with a DIFFERENT instance_id + old_events = old_events + new_events + current_timestamp = current_timestamp + timedelta(seconds=1) + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_timer_created_event(2, current_timestamp), + helpers.new_timer_fired_event(2, current_timestamp), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createChildWorkflow') + + # Second child fails. Simulate the second child having a DIFFERENT instance ID + # (e.g. if a backend assigned a new ID on retry) to prove the timer origin still + # references the FIRST child's ID regardless. + expected_second_child_id = f'{TEST_INSTANCE_ID}:0003' + old_events = old_events + new_events + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_child_workflow_created_event(3, 'child_orchestrator', expected_second_child_id), + helpers.new_child_workflow_failed_event(3, ValueError('child failed')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + assert len(actions) == 1 + assert actions[0].HasField('createTimer') + retry_timer_2 = actions[0].createTimer + + # Retry timer 2 must ALSO point to the FIRST child's instance ID, + # NOT the second child's ID. + assert retry_timer_2.childWorkflowRetry.instanceId == expected_first_child_id + assert retry_timer_2.childWorkflowRetry.instanceId != expected_second_child_id + + def get_and_validate_single_complete_workflow_action( actions: list[pb.WorkflowAction], ) -> pb.CompleteWorkflowAction: