Conversation
|
🤖 Emulator PR Created A draft PR has been created with locked dependencies: ➡️ https://github.com/aws/aws-durable-execution-emulator/pull/316 The emulator will build binaries using the exact testing SDK commit locked in uv.lock. |
a6b1215 to
91c7fdc
Compare
|
🔄 Emulator PR Updated The emulator PR has been updated with locked dependencies: ➡️ https://github.com/aws/aws-durable-execution-emulator/pull/316 |
91c7fdc to
9b4d310
Compare
|
🔄 Emulator PR Updated The emulator PR has been updated with locked dependencies: ➡️ https://github.com/aws/aws-durable-execution-emulator/pull/316 |
| function_name: str = "test-function", | ||
| execution_name: str = "execution-name", | ||
| account_id: str = "123456789012", | ||
| skip_time: bool = False, |
There was a problem hiding this comment.
rather than constrict this to a bool, why not have wait_multiplier or wait_scale available here - also this is what it maps to internally anyway?
There was a problem hiding this comment.
Yeah - I considered that but I think having skip_time is a nicer interface since that's what will be more commonly used for unit tests, don't you think? We could add both?
There was a problem hiding this comment.
there should rather be one obvious way of achieving this, rather than multiplying (ha :-) ) options.
I'd argue the bool removes functionality unnecessarily, and specifying wait_multiplier = 0 is not particularly harder than skip_time=false.
| """ | ||
| # Update time_scale in checkpoint processor for this run | ||
| time_scale = 0.0 if skip_time else 1.0 | ||
| self._checkpoint_processor._transformer.processors[OperationType.WAIT] = ( |
There was a problem hiding this comment.
this isn't quite in the spirit of the dependency injection patterns here - it's effectively monkey-patching the processor after initialization.
1. runner.py - DurableFunctionTestRunner.init()
def __init__(
self,
handler: Callable,
wait_multiplier: float = 1.0, # NEW PARAMETER
# ... other existing params
):
# ... existing code ...
# Pass wait_multiplier to CheckpointProcessor
self._checkpoint_processor = CheckpointProcessor(
store=self._store,
scheduler=self._scheduler,
time_scale=wait_multiplier, # NEW ARGUMENT
)2. checkpoint/processor.py - CheckpointProcessor.init()
def __init__(
self,
store: ExecutionStore,
scheduler: Scheduler,
time_scale: float = 1.0, # NEW PARAMETER
):
self._store = store
self._scheduler = scheduler
self._notifier = ExecutionNotifier()
# Use factory method with time_scale
self._transformer = OperationTransformer.create(time_scale=time_scale)3. checkpoint/transformer.py - OperationTransformer
class OperationTransformer:
"""Transforms OperationUpdates to Operations."""
def __init__(
self,
processors: MutableMapping[OperationType, OperationProcessor], # NOW REQUIRED
):
"""Initialize with explicit processor mapping."""
self.processors = processors
@classmethod
def create(cls, time_scale: float = 1.0) -> OperationTransformer: # NEW FACTORY METHOD
"""Create OperationTransformer with standard processors.
Args:
time_scale: Multiplier for wait durations. 0.0 = skip, 1.0 = real time.
"""
processors = {
OperationType.STEP: StepProcessor(),
OperationType.WAIT: WaitProcessor(time_scale=time_scale), # PASS time_scale
OperationType.CONTEXT: ContextProcessor(),
OperationType.CALLBACK: CallbackProcessor(),
OperationType.EXECUTION: ExecutionProcessor(),
}
return cls(processors=processors)4. checkpoint/processors/wait.py - WaitProcessor
class WaitProcessor(OperationProcessor):
"""Processes WAIT operation updates with timer scheduling."""
def __init__(self, time_scale: float = 1.0): # NEW CONSTRUCTOR
"""Initialize WaitProcessor with time scale.
Args:
time_scale: Multiplier for wait durations. Can be overridden by
DURABLE_EXECUTION_TIME_SCALE environment variable.
"""
self._time_scale = time_scale
def process(self, update, current_op, notifier, execution_arn):
"""Process WAIT operation update."""
match update.action:
case OperationAction.START:
wait_seconds = update.wait_options.wait_seconds if update.wait_options else 0
# Environment variable overrides constructor parameter
time_scale = float(
os.getenv("DURABLE_EXECUTION_TIME_SCALE", str(self._time_scale))
)
scaled_wait_seconds = wait_seconds * time_scale
# ... rest of implementation| """Initialize WaitProcessor with time scale. | ||
|
|
||
| Args: | ||
| time_scale: Multiplier for wait durations. Use 0.0 to skip waits entirely. |
There was a problem hiding this comment.
Are there any side-effects with very small/zero wait times for Python? In the TS testing library I had to implement a custom queue scheduler to ensure that invocations completed when skipTime was enabled, so I'm not sure if Python would have the same issues.
(PR for that change: aws/aws-durable-execution-sdk-js#298)
There was a problem hiding this comment.
Seems like we only use time scaling for wait, but what about step retries?
Issue #, if available:
Closes #188
Description of changes:
Adding support for a
skip_timeparameter in the python test runner.Dependencies
If this PR requires testing against a specific branch of the Python Language SDK (e.g., for unreleased changes), uncomment and specify the branch below. Otherwise, leave commented to use the main branch.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.