Skip to content

[Feature]: Expose whether the operation is just started to execute method #414

@zhongkechen

Description

@zhongkechen

What would you like?

Currently check_result_status always return the checkpointed result after STARTED is checkpointed. This hides the information of whether this operation is the first time started at the execution phase in execute method. This information is required in some cases. For example, when calling plugin interface, we want to know if a CONTEXT operation is calling the user function for the first time.

Possible Implementation

refactor the base OperationExecutor interface to carry more info

@dataclass(frozen=True)
class CheckResult(Generic[T]):
    is_ready_to_execute: bool
    has_checkpointed_result: bool
    checkpointed_result: CheckpointedResult | None = None
    deserialized_result: T | None = None

class OperationExecutor(ABC, Generic[T]):
    @abstractmethod
    def check_result_status(self) -> CheckResult[T]:
        ...

    @abstractmethod
    def execute(self, checkpointed_result: CheckpointedResult) -> T:
        ...

    def process(self) -> T:
        result = self.check_result_status()
        if not result.is_ready_to_execute and not result.has_checkpointed_result:
            result = self.check_result_status()
        if result.has_checkpointed_result:
            return result.deserialized_result
        if result.is_ready_to_execute:
            if result.checkpointed_result is None:
                msg = "CheckResult is marked ready to execute but checkpointed result is not set."
                raise InvalidStateError(msg)
            return self.execute(result.checkpointed_result)   # <<--- this is currently always called with checkpointed result after STARTED

We have a few choices

  1. choose the JS SDK style - each operation executor (handler) handles its own logic without sharing anything with others. This means execute and check_result_status methods are internal to each operation. The operation executors don't have to implement the same interface.
  2. choose the Java SDK style - operations share the common parts in BaseOperation while each operation implement its own start and replay methods. Using start and replay is more flexible than execute and check_result_status.
  3. fix this with minimal changes required for plugin interface: patch ChildOperationExecutor in Python SDK to override process method and implement the required logic specifically for what the plugin interface requires

Is this a breaking change?

No

Does this require an RFC?

No

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions