feat: define child workflow state machine#99
Conversation
8d1beff to
ef8f7e5
Compare
Codecov Report❌ Patch coverage is
🚀 New features to boost your workflow:
|
|
Gitar is not allowed to push to this forked PR. |
| if self.state is DecisionState.REQUESTED: | ||
| return decision.Decision( | ||
| start_child_workflow_execution_decision_attributes=self.request | ||
| ) | ||
| if self.state is DecisionState.CANCELED_AFTER_REQUESTED: | ||
| return record_immediate_cancel(self.request) | ||
| if self.state in ( | ||
| DecisionState.CANCELED_AFTER_RECORDED, | ||
| DecisionState.CANCELED_AFTER_STARTED, | ||
| ): |
There was a problem hiding this comment.
nit: use switch in python match
| id_for_event = getattr(event_attributes, action.id_attr) | ||
| # This may be a reference via the user id or a reference to a previous event. | ||
| # Supports dotted paths (e.g. "workflow_execution.workflow_id") for nested fields. | ||
| id_for_event = resolve_id_attr(event_attributes, action.id_attr) |
There was a problem hiding this comment.
just a comment: we might want to revisit to simplify the logic. The complexity is to point an event to a statemachine id.
In the past, it's simply resolved by alias logic. Now with childworkflow, the mapping logic becomes more complicated.
| ) | ||
| machine = ChildWorkflowExecutionStateMachine(attrs, execution, result) | ||
| self._add_state_machine(machine) | ||
| return execution, result |
There was a problem hiding this comment.
why do we need the result here?
| class ChildWorkflowError(Exception): | ||
| """Base class for all child workflow lifecycle errors.""" | ||
|
|
||
| pass | ||
|
|
||
|
|
||
| class StartChildWorkflowExecutionFailed(ChildWorkflowError): | ||
| def __init__(self, message: str, cause: Any, workflow_id: str) -> None: | ||
| super().__init__(message) | ||
| self.cause = cause | ||
| self.workflow_id = workflow_id | ||
|
|
||
|
|
||
| class ChildWorkflowExecutionFailed(ChildWorkflowError): | ||
| def __init__(self, message: str, failure: Any) -> None: | ||
| super().__init__(message) | ||
| self.failure = failure | ||
|
|
||
|
|
||
| class ChildWorkflowExecutionCanceled(ChildWorkflowError): | ||
| def __init__(self, message: str, details: Any) -> None: | ||
| super().__init__(message) | ||
| self.details = details | ||
|
|
||
|
|
||
| class ChildWorkflowExecutionTimedOut(ChildWorkflowError): | ||
| def __init__(self, message: str, timeout_type: int) -> None: | ||
| super().__init__(message) | ||
| self.timeout_type = timeout_type | ||
|
|
||
|
|
||
| class ChildWorkflowExecutionTerminated(ChildWorkflowError): |
There was a problem hiding this comment.
All errors in this file is exposed to Client. We don't have APIs on child workflows and thus we don't need it here. The suggestion would be to move to where the error is needed.
There was a problem hiding this comment.
I don't think you intended to update this
natemort
left a comment
There was a problem hiding this comment.
LGTM, +1 to Shijie's comments.
| * A method can only be one of @workflow.run, @workflow.signal, or @workflow.query. | ||
|
|
||
| Args: | ||
| name: The name of the query type. If not provided, use the function name. |
There was a problem hiding this comment.
💡 Quality: Query decorator docstring contradicts implementation
The docstring for query() states "If not provided, use the function name" (line 391), but the implementation immediately raises ValueError("name is required") when name is None (line 399-400). The docstring should be updated to match the actual behavior, or the implementation should fall back to f.__name__ like the docstring promises.
The signal decorator has the same pattern (requires name), so this is consistent behavior but the docstring is misleading.
Fix 1: Fix docstring to match implementation (name is required)
Args:
name: The name of the query type. Required.
- Apply fix
Fix 2: Alternatively, implement the fallback to function name as documented
if name is None:
def decorator(f: T) -> T:
f._workflow_query = f.__name__ # type: ignore[attr-defined]
return f
return decorator
def decorator(f: T) -> T:
f._workflow_query = name # type: ignore[attr-defined]
return f
return decorator
- Apply fix
Check a box to apply a fix or reply for a change | Was this helpful? React with 👍 / 👎
Signed-off-by: Tim Li <ltim@uber.com>
Signed-off-by: Tim Li <ltim@uber.com>
Signed-off-by: Tim Li <ltim@uber.com>
Signed-off-by: Tim Li <ltim@uber.com>
Signed-off-by: Tim Li <ltim@uber.com>
CI failed: The test `test_async_signal_handler_direct_mutation` failed due to a workflow timeout, likely caused by regressions in the new state machine logic introduced in this PR.OverviewThe integration test suite failed during the FailuresWorkflow Timeout in
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
What changed?
define child workflow state machine
Why?
essential step for implement child wf
How did you test it?
unit tests
Potential risks
Release notes
Documentation Changes