-
Notifications
You must be signed in to change notification settings - Fork 1
feat: add skip_time parameter to test runner #192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,16 @@ | |
| class WaitProcessor(OperationProcessor): | ||
| """Processes WAIT operation updates with timer scheduling.""" | ||
|
|
||
| def __init__(self, time_scale: float = 1.0): | ||
| """Initialize WaitProcessor with time scale. | ||
|
|
||
| Args: | ||
| time_scale: Multiplier for wait durations. Use 0.0 to skip waits entirely. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any side-effects with very small/zero wait times for Python? In the TS testing library I had to implement a custom queue scheduler to ensure that invocations completed when skipTime was enabled, so I'm not sure if Python would have the same issues. (PR for that change: aws/aws-durable-execution-sdk-js#298) |
||
| Defaults to 1.0 (real time). Can also be overridden by | ||
| DURABLE_EXECUTION_TIME_SCALE environment variable. | ||
| """ | ||
| self._time_scale = time_scale | ||
|
|
||
| def process( | ||
| self, | ||
| update: OperationUpdate, | ||
|
|
@@ -43,8 +53,11 @@ def process( | |
| wait_seconds = ( | ||
| update.wait_options.wait_seconds if update.wait_options else 0 | ||
| ) | ||
| time_scale = float(os.getenv("DURABLE_EXECUTION_TIME_SCALE", "1.0")) | ||
| logging.info("Using DURABLE_EXECUTION_TIME_SCALE: %f", time_scale) | ||
| # Environment variable takes precedence | ||
| time_scale = float( | ||
| os.getenv("DURABLE_EXECUTION_TIME_SCALE", str(self._time_scale)) | ||
| ) | ||
| logging.info("Using time_scale: %f", time_scale) | ||
| scaled_wait_seconds = wait_seconds * time_scale | ||
|
|
||
| scheduled_end_timestamp = datetime.now(UTC) + timedelta( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,9 @@ | |
| from aws_durable_execution_sdk_python_testing.checkpoint.processor import ( | ||
| CheckpointProcessor, | ||
| ) | ||
| from aws_durable_execution_sdk_python_testing.checkpoint.processors.wait import ( | ||
| WaitProcessor, | ||
| ) | ||
| from aws_durable_execution_sdk_python_testing.client import InMemoryServiceClient | ||
| from aws_durable_execution_sdk_python_testing.exceptions import ( | ||
| DurableFunctionsLocalRunnerError, | ||
|
|
@@ -606,7 +609,28 @@ def run( | |
| function_name: str = "test-function", | ||
| execution_name: str = "execution-name", | ||
| account_id: str = "123456789012", | ||
| skip_time: bool = False, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rather than constrict this to a bool, why not have
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah - I considered that but I think having
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there should rather be one obvious way of achieving this, rather than multiplying (ha :-) ) options. I'd argue the bool removes functionality unnecessarily, and specifying |
||
| ) -> DurableFunctionTestResult: | ||
| """Run the durable function and wait for completion. | ||
|
|
||
| Args: | ||
| input: Input payload for the function | ||
| timeout: Maximum execution time in seconds | ||
| function_name: Name of the function | ||
| execution_name: Name of the execution | ||
| account_id: AWS account ID | ||
| skip_time: If True, wait operations complete immediately. If False (default), | ||
| wait operations use real time delays. | ||
|
|
||
| Returns: | ||
| Test result containing execution status and operations | ||
| """ | ||
| # Update time_scale in checkpoint processor for this run | ||
| time_scale = 0.0 if skip_time else 1.0 | ||
| self._checkpoint_processor._transformer.processors[OperationType.WAIT] = ( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this isn't quite in the spirit of the dependency injection patterns here - it's effectively monkey-patching the processor after initialization. 1. runner.py - DurableFunctionTestRunner.init()def __init__(
self,
handler: Callable,
wait_multiplier: float = 1.0, # NEW PARAMETER
# ... other existing params
):
# ... existing code ...
# Pass wait_multiplier to CheckpointProcessor
self._checkpoint_processor = CheckpointProcessor(
store=self._store,
scheduler=self._scheduler,
time_scale=wait_multiplier, # NEW ARGUMENT
)2. checkpoint/processor.py - CheckpointProcessor.init()def __init__(
self,
store: ExecutionStore,
scheduler: Scheduler,
time_scale: float = 1.0, # NEW PARAMETER
):
self._store = store
self._scheduler = scheduler
self._notifier = ExecutionNotifier()
# Use factory method with time_scale
self._transformer = OperationTransformer.create(time_scale=time_scale)3. checkpoint/transformer.py - OperationTransformerclass OperationTransformer:
"""Transforms OperationUpdates to Operations."""
def __init__(
self,
processors: MutableMapping[OperationType, OperationProcessor], # NOW REQUIRED
):
"""Initialize with explicit processor mapping."""
self.processors = processors
@classmethod
def create(cls, time_scale: float = 1.0) -> OperationTransformer: # NEW FACTORY METHOD
"""Create OperationTransformer with standard processors.
Args:
time_scale: Multiplier for wait durations. 0.0 = skip, 1.0 = real time.
"""
processors = {
OperationType.STEP: StepProcessor(),
OperationType.WAIT: WaitProcessor(time_scale=time_scale), # PASS time_scale
OperationType.CONTEXT: ContextProcessor(),
OperationType.CALLBACK: CallbackProcessor(),
OperationType.EXECUTION: ExecutionProcessor(),
}
return cls(processors=processors)4. checkpoint/processors/wait.py - WaitProcessorclass WaitProcessor(OperationProcessor):
"""Processes WAIT operation updates with timer scheduling."""
def __init__(self, time_scale: float = 1.0): # NEW CONSTRUCTOR
"""Initialize WaitProcessor with time scale.
Args:
time_scale: Multiplier for wait durations. Can be overridden by
DURABLE_EXECUTION_TIME_SCALE environment variable.
"""
self._time_scale = time_scale
def process(self, update, current_op, notifier, execution_arn):
"""Process WAIT operation update."""
match update.action:
case OperationAction.START:
wait_seconds = update.wait_options.wait_seconds if update.wait_options else 0
# Environment variable overrides constructor parameter
time_scale = float(
os.getenv("DURABLE_EXECUTION_TIME_SCALE", str(self._time_scale))
)
scaled_wait_seconds = wait_seconds * time_scale
# ... rest of implementation |
||
| WaitProcessor(time_scale=time_scale) | ||
| ) | ||
|
|
||
| execution_arn = self.run_async( | ||
| input=input, | ||
| timeout=timeout, | ||
|
|
@@ -907,8 +931,13 @@ def run( | |
| self, | ||
| input: str | None = None, # noqa: A002 | ||
| timeout: int = 60, | ||
| skip_time: bool = False, # noqa: ARG002 | ||
| ) -> DurableFunctionTestResult: | ||
| """Execute function on AWS Lambda and wait for completion.""" | ||
| """Execute function on AWS Lambda and wait for completion. | ||
|
|
||
| Note: skip_time parameter is ignored for cloud runner as timing is | ||
| controlled by the Lambda service. | ||
| """ | ||
| logger.info( | ||
| "Invoking Lambda function: %s (timeout: %ds)", self.function_name, timeout | ||
| ) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we only use time scaling for wait, but what about step retries?