Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,26 @@ def test_my_durable_functions():
three_result: StepOperation = result.get_step("three")
assert three_result.result == '"5 6"'
```

### Skipping Time in Tests

By default, wait operations use real time delays. For faster test execution, use the `skip_time` parameter:

```python
def test_with_skip_time():
with DurableFunctionTestRunner(handler=function_under_test) as runner:
# Wait operations complete immediately
result = runner.run(input="input str", timeout=10, skip_time=True)

assert result.status is InvocationStatus.SUCCEEDED
```

You can also use the `DURABLE_EXECUTION_TIME_SCALE` environment variable to control wait durations:
- `DURABLE_EXECUTION_TIME_SCALE=0` - Skip all waits (same as `skip_time=True`)
- `DURABLE_EXECUTION_TIME_SCALE=0.5` - Half speed
- `DURABLE_EXECUTION_TIME_SCALE=2.0` - Double speed


## Architecture
![Durable Functions Python Test Framework Architecture](/assets/dar-python-test-framework-architecture.svg)

Expand Down
3 changes: 2 additions & 1 deletion examples/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ def run(
self,
input: str | None = None, # noqa: A002
timeout: int = 60,
skip_time: bool = False,
) -> DurableFunctionTestResult:
"""Execute the durable function and return results."""
return self._runner.run(input=input, timeout=timeout)
return self._runner.run(input=input, timeout=timeout, skip_time=skip_time)

def run_async(
self,
Expand Down
2 changes: 1 addition & 1 deletion examples/test/wait/test_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
def test_wait(durable_runner):
"""Test wait example."""
with durable_runner:
result = durable_runner.run(input="test", timeout=10)
result = durable_runner.run(input="test", timeout=10, skip_time=True)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == "Wait completed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
class CheckpointProcessor:
"""Handle OperationUpdate transformations and execution state updates."""

def __init__(self, store: ExecutionStore, scheduler: Scheduler):
def __init__(
self, store: ExecutionStore, scheduler: Scheduler, time_scale: float = 1.0
):
self._store = store
self._scheduler = scheduler
self._notifier = ExecutionNotifier()
self._transformer = OperationTransformer()
self._transformer = OperationTransformer(time_scale=time_scale)

def add_execution_observer(self, observer) -> None:
"""Add observer for execution events."""
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we only use time scaling for wait, but what about step retries?

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@
class WaitProcessor(OperationProcessor):
"""Processes WAIT operation updates with timer scheduling."""

def __init__(self, time_scale: float = 1.0):
"""Initialize WaitProcessor with time scale.

Args:
time_scale: Multiplier for wait durations. Use 0.0 to skip waits entirely.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any side-effects with very small/zero wait times for Python? In the TS testing library I had to implement a custom queue scheduler to ensure that invocations completed when skipTime was enabled, so I'm not sure if Python would have the same issues.

(PR for that change: aws/aws-durable-execution-sdk-js#298)

Defaults to 1.0 (real time). Can also be overridden by
DURABLE_EXECUTION_TIME_SCALE environment variable.
"""
self._time_scale = time_scale

def process(
self,
update: OperationUpdate,
Expand All @@ -43,8 +53,11 @@ def process(
wait_seconds = (
update.wait_options.wait_seconds if update.wait_options else 0
)
time_scale = float(os.getenv("DURABLE_EXECUTION_TIME_SCALE", "1.0"))
logging.info("Using DURABLE_EXECUTION_TIME_SCALE: %f", time_scale)
# Environment variable takes precedence
time_scale = float(
os.getenv("DURABLE_EXECUTION_TIME_SCALE", str(self._time_scale))
)
logging.info("Using time_scale: %f", time_scale)
scaled_wait_seconds = wait_seconds * time_scale

scheduled_end_timestamp = datetime.now(UTC) + timedelta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,21 @@
class OperationTransformer:
"""Transforms OperationUpdates to Operations while maintaining order and triggering scheduler actions."""

_DEFAULT_PROCESSORS: ClassVar[dict[OperationType, OperationProcessor]] = {
OperationType.STEP: StepProcessor(),
OperationType.WAIT: WaitProcessor(),
OperationType.CONTEXT: ContextProcessor(),
OperationType.CALLBACK: CallbackProcessor(),
OperationType.EXECUTION: ExecutionProcessor(),
}

def __init__(
self,
processors: MutableMapping[OperationType, OperationProcessor] | None = None,
time_scale: float = 1.0,
):
self.processors = processors if processors else self._DEFAULT_PROCESSORS
if processors:
self.processors = processors
else:
self.processors = {
OperationType.STEP: StepProcessor(),
OperationType.WAIT: WaitProcessor(time_scale=time_scale),
OperationType.CONTEXT: ContextProcessor(),
OperationType.CALLBACK: CallbackProcessor(),
OperationType.EXECUTION: ExecutionProcessor(),
}

def process_updates(
self,
Expand Down
31 changes: 30 additions & 1 deletion src/aws_durable_execution_sdk_python_testing/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
from aws_durable_execution_sdk_python_testing.checkpoint.processor import (
CheckpointProcessor,
)
from aws_durable_execution_sdk_python_testing.checkpoint.processors.wait import (
WaitProcessor,
)
from aws_durable_execution_sdk_python_testing.client import InMemoryServiceClient
from aws_durable_execution_sdk_python_testing.exceptions import (
DurableFunctionsLocalRunnerError,
Expand Down Expand Up @@ -606,7 +609,28 @@ def run(
function_name: str = "test-function",
execution_name: str = "execution-name",
account_id: str = "123456789012",
skip_time: bool = False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than constrict this to a bool, why not have wait_multiplier or wait_scale available here - also this is what it maps to internally anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I considered that but I think having skip_time is a nicer interface since that's what will be more commonly used for unit tests, don't you think? We could add both?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should rather be one obvious way of achieving this, rather than multiplying (ha :-) ) options.

I'd argue the bool removes functionality unnecessarily, and specifying wait_multiplier = 0 is not particularly harder than skip_time=false.

) -> DurableFunctionTestResult:
"""Run the durable function and wait for completion.

Args:
input: Input payload for the function
timeout: Maximum execution time in seconds
function_name: Name of the function
execution_name: Name of the execution
account_id: AWS account ID
skip_time: If True, wait operations complete immediately. If False (default),
wait operations use real time delays.

Returns:
Test result containing execution status and operations
"""
# Update time_scale in checkpoint processor for this run
time_scale = 0.0 if skip_time else 1.0
self._checkpoint_processor._transformer.processors[OperationType.WAIT] = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't quite in the spirit of the dependency injection patterns here - it's effectively monkey-patching the processor after initialization.

1. runner.py - DurableFunctionTestRunner.init()

def __init__(
    self,
    handler: Callable,
    wait_multiplier: float = 1.0,  # NEW PARAMETER
    # ... other existing params
):
    # ... existing code ...
    
    # Pass wait_multiplier to CheckpointProcessor
    self._checkpoint_processor = CheckpointProcessor(
        store=self._store,
        scheduler=self._scheduler,
        time_scale=wait_multiplier,  # NEW ARGUMENT
    )

2. checkpoint/processor.py - CheckpointProcessor.init()

def __init__(
    self,
    store: ExecutionStore,
    scheduler: Scheduler,
    time_scale: float = 1.0,  # NEW PARAMETER
):
    self._store = store
    self._scheduler = scheduler
    self._notifier = ExecutionNotifier()
    
    # Use factory method with time_scale
    self._transformer = OperationTransformer.create(time_scale=time_scale)

3. checkpoint/transformer.py - OperationTransformer

class OperationTransformer:
    """Transforms OperationUpdates to Operations."""
    
    def __init__(
        self,
        processors: MutableMapping[OperationType, OperationProcessor],  # NOW REQUIRED
    ):
        """Initialize with explicit processor mapping."""
        self.processors = processors
    
    @classmethod
    def create(cls, time_scale: float = 1.0) -> OperationTransformer:  # NEW FACTORY METHOD
        """Create OperationTransformer with standard processors.
        
        Args:
            time_scale: Multiplier for wait durations. 0.0 = skip, 1.0 = real time.
        """
        processors = {
            OperationType.STEP: StepProcessor(),
            OperationType.WAIT: WaitProcessor(time_scale=time_scale),  # PASS time_scale
            OperationType.CONTEXT: ContextProcessor(),
            OperationType.CALLBACK: CallbackProcessor(),
            OperationType.EXECUTION: ExecutionProcessor(),
        }
        return cls(processors=processors)

4. checkpoint/processors/wait.py - WaitProcessor

class WaitProcessor(OperationProcessor):
    """Processes WAIT operation updates with timer scheduling."""
    
    def __init__(self, time_scale: float = 1.0):  # NEW CONSTRUCTOR
        """Initialize WaitProcessor with time scale.
        
        Args:
            time_scale: Multiplier for wait durations. Can be overridden by
                       DURABLE_EXECUTION_TIME_SCALE environment variable.
        """
        self._time_scale = time_scale
    
    def process(self, update, current_op, notifier, execution_arn):
        """Process WAIT operation update."""
        match update.action:
            case OperationAction.START:
                wait_seconds = update.wait_options.wait_seconds if update.wait_options else 0
                
                # Environment variable overrides constructor parameter
                time_scale = float(
                    os.getenv("DURABLE_EXECUTION_TIME_SCALE", str(self._time_scale))
                )
                
                scaled_wait_seconds = wait_seconds * time_scale
                # ... rest of implementation

WaitProcessor(time_scale=time_scale)
)

execution_arn = self.run_async(
input=input,
timeout=timeout,
Expand Down Expand Up @@ -907,8 +931,13 @@ def run(
self,
input: str | None = None, # noqa: A002
timeout: int = 60,
skip_time: bool = False, # noqa: ARG002
) -> DurableFunctionTestResult:
"""Execute function on AWS Lambda and wait for completion."""
"""Execute function on AWS Lambda and wait for completion.

Note: skip_time parameter is ignored for cloud runner as timing is
controlled by the Lambda service.
"""
logger.info(
"Invoking Lambda function: %s (timeout: %ds)", self.function_name, timeout
)
Expand Down
Loading