diff --git a/README.md b/README.md index 85824d6..54d5065 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,26 @@ def test_my_durable_functions(): three_result: StepOperation = result.get_step("three") assert three_result.result == '"5 6"' ``` + +### Skipping Time in Tests + +By default, wait operations use real time delays. For faster test execution, use the `skip_time` parameter: + +```python +def test_with_skip_time(): + with DurableFunctionTestRunner(handler=function_under_test) as runner: + # Wait operations complete immediately + result = runner.run(input="input str", timeout=10, skip_time=True) + + assert result.status is InvocationStatus.SUCCEEDED +``` + +You can also use the `DURABLE_EXECUTION_TIME_SCALE` environment variable to control wait durations: +- `DURABLE_EXECUTION_TIME_SCALE=0` - Skip all waits (same as `skip_time=True`) +- `DURABLE_EXECUTION_TIME_SCALE=0.5` - Half speed +- `DURABLE_EXECUTION_TIME_SCALE=2.0` - Double speed + + ## Architecture ![Durable Functions Python Test Framework Architecture](/assets/dar-python-test-framework-architecture.svg) diff --git a/examples/test/conftest.py b/examples/test/conftest.py index 679ba48..8ed118b 100644 --- a/examples/test/conftest.py +++ b/examples/test/conftest.py @@ -104,9 +104,10 @@ def run( self, input: str | None = None, # noqa: A002 timeout: int = 60, + skip_time: bool = False, ) -> DurableFunctionTestResult: """Execute the durable function and return results.""" - return self._runner.run(input=input, timeout=timeout) + return self._runner.run(input=input, timeout=timeout, skip_time=skip_time) def run_async( self, diff --git a/examples/test/wait/test_wait.py b/examples/test/wait/test_wait.py index 66cd627..8403194 100644 --- a/examples/test/wait/test_wait.py +++ b/examples/test/wait/test_wait.py @@ -15,7 +15,7 @@ def test_wait(durable_runner): """Test wait example.""" with durable_runner: - result = durable_runner.run(input="test", timeout=10) + result = durable_runner.run(input="test", timeout=10, skip_time=True) assert result.status is InvocationStatus.SUCCEEDED assert deserialize_operation_payload(result.result) == "Wait completed" diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py index 04b991c..50ad11c 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py @@ -33,11 +33,13 @@ class CheckpointProcessor: """Handle OperationUpdate transformations and execution state updates.""" - def __init__(self, store: ExecutionStore, scheduler: Scheduler): + def __init__( + self, store: ExecutionStore, scheduler: Scheduler, time_scale: float = 1.0 + ): self._store = store self._scheduler = scheduler self._notifier = ExecutionNotifier() - self._transformer = OperationTransformer() + self._transformer = OperationTransformer(time_scale=time_scale) def add_execution_observer(self, observer) -> None: """Add observer for execution events.""" diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/wait.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/wait.py index 01cc69b..f0c5f90 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/wait.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/wait.py @@ -30,6 +30,16 @@ class WaitProcessor(OperationProcessor): """Processes WAIT operation updates with timer scheduling.""" + def __init__(self, time_scale: float = 1.0): + """Initialize WaitProcessor with time scale. + + Args: + time_scale: Multiplier for wait durations. Use 0.0 to skip waits entirely. + Defaults to 1.0 (real time). Can also be overridden by + DURABLE_EXECUTION_TIME_SCALE environment variable. + """ + self._time_scale = time_scale + def process( self, update: OperationUpdate, @@ -43,8 +53,11 @@ def process( wait_seconds = ( update.wait_options.wait_seconds if update.wait_options else 0 ) - time_scale = float(os.getenv("DURABLE_EXECUTION_TIME_SCALE", "1.0")) - logging.info("Using DURABLE_EXECUTION_TIME_SCALE: %f", time_scale) + # Environment variable takes precedence + time_scale = float( + os.getenv("DURABLE_EXECUTION_TIME_SCALE", str(self._time_scale)) + ) + logging.info("Using time_scale: %f", time_scale) scaled_wait_seconds = wait_seconds * time_scale scheduled_end_timestamp = datetime.now(UTC) + timedelta( diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/transformer.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/transformer.py index cd37b8a..617be64 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/transformer.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/transformer.py @@ -43,19 +43,21 @@ class OperationTransformer: """Transforms OperationUpdates to Operations while maintaining order and triggering scheduler actions.""" - _DEFAULT_PROCESSORS: ClassVar[dict[OperationType, OperationProcessor]] = { - OperationType.STEP: StepProcessor(), - OperationType.WAIT: WaitProcessor(), - OperationType.CONTEXT: ContextProcessor(), - OperationType.CALLBACK: CallbackProcessor(), - OperationType.EXECUTION: ExecutionProcessor(), - } - def __init__( self, processors: MutableMapping[OperationType, OperationProcessor] | None = None, + time_scale: float = 1.0, ): - self.processors = processors if processors else self._DEFAULT_PROCESSORS + if processors: + self.processors = processors + else: + self.processors = { + OperationType.STEP: StepProcessor(), + OperationType.WAIT: WaitProcessor(time_scale=time_scale), + OperationType.CONTEXT: ContextProcessor(), + OperationType.CALLBACK: CallbackProcessor(), + OperationType.EXECUTION: ExecutionProcessor(), + } def process_updates( self, diff --git a/src/aws_durable_execution_sdk_python_testing/runner.py b/src/aws_durable_execution_sdk_python_testing/runner.py index 34e80f4..3c93c05 100644 --- a/src/aws_durable_execution_sdk_python_testing/runner.py +++ b/src/aws_durable_execution_sdk_python_testing/runner.py @@ -35,6 +35,9 @@ from aws_durable_execution_sdk_python_testing.checkpoint.processor import ( CheckpointProcessor, ) +from aws_durable_execution_sdk_python_testing.checkpoint.processors.wait import ( + WaitProcessor, +) from aws_durable_execution_sdk_python_testing.client import InMemoryServiceClient from aws_durable_execution_sdk_python_testing.exceptions import ( DurableFunctionsLocalRunnerError, @@ -606,7 +609,28 @@ def run( function_name: str = "test-function", execution_name: str = "execution-name", account_id: str = "123456789012", + skip_time: bool = False, ) -> DurableFunctionTestResult: + """Run the durable function and wait for completion. + + Args: + input: Input payload for the function + timeout: Maximum execution time in seconds + function_name: Name of the function + execution_name: Name of the execution + account_id: AWS account ID + skip_time: If True, wait operations complete immediately. If False (default), + wait operations use real time delays. + + Returns: + Test result containing execution status and operations + """ + # Update time_scale in checkpoint processor for this run + time_scale = 0.0 if skip_time else 1.0 + self._checkpoint_processor._transformer.processors[OperationType.WAIT] = ( + WaitProcessor(time_scale=time_scale) + ) + execution_arn = self.run_async( input=input, timeout=timeout, @@ -907,8 +931,13 @@ def run( self, input: str | None = None, # noqa: A002 timeout: int = 60, + skip_time: bool = False, # noqa: ARG002 ) -> DurableFunctionTestResult: - """Execute function on AWS Lambda and wait for completion.""" + """Execute function on AWS Lambda and wait for completion. + + Note: skip_time parameter is ignored for cloud runner as timing is + controlled by the Lambda service. + """ logger.info( "Invoking Lambda function: %s (timeout: %ds)", self.function_name, timeout )