From 5ca0d3ccd7b433137102c7941bc938352328d1aa Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Tue, 12 May 2026 12:02:39 -0400 Subject: [PATCH 1/2] Retry stack-info: PR: https://github.com/aws/aws-lambda-dotnet/pull/2363, branch: GarrettBeatty/stack/3 --- .../DurableExecutionException.cs | 20 ++ .../DurableFunction.cs | 3 +- .../IRetryStrategy.cs | 39 +++ .../Internal/CheckpointBatcher.cs | 13 +- .../Internal/CheckpointBatcherConfig.cs | 10 +- .../Internal/ExecutionState.cs | 22 +- .../Internal/OperationIdGenerator.cs | 13 +- .../Internal/StepOperation.cs | 238 +++++++++++--- .../Internal/TerminationManager.cs | 1 + .../Internal/WaitOperation.cs | 2 +- .../Operation.cs | 24 ++ .../RetryStrategy.cs | 201 ++++++++++++ .../StepConfig.cs | 15 +- .../AtMostOnceCrashReplayTest.cs | 81 +++++ .../DurableFunctionDeployment.cs | 36 ++- .../LongRetryChainTest.cs | 77 +++++ .../LongerWaitTest.cs | 5 +- .../MultipleStepsTest.cs | 5 +- .../ReplayDeterminismTest.cs | 5 +- .../RetryExhaustionTest.cs | 80 +++++ .../RetryTest.cs | 78 +++++ .../StepFailsTest.cs | 5 +- .../StepWaitStepTest.cs | 5 +- .../AtMostOnceCrashFunction.csproj | 18 ++ .../AtMostOnceCrashFunction/Dockerfile | 7 + .../AtMostOnceCrashFunction/Function.cs | 69 ++++ .../LongRetryChainFunction/Dockerfile | 7 + .../LongRetryChainFunction/Function.cs | 57 ++++ .../LongRetryChainFunction.csproj | 18 ++ .../RetryExhaustionFunction/Dockerfile | 7 + .../RetryExhaustionFunction/Function.cs | 47 +++ .../RetryExhaustionFunction.csproj | 18 ++ .../TestFunctions/RetryFunction/Dockerfile | 7 + .../TestFunctions/RetryFunction/Function.cs | 49 +++ .../RetryFunction/RetryFunction.csproj | 18 ++ .../DurableContextTests.cs | 302 ++++++++++++++++++ .../DurableFunctionTests.cs | 61 ++-- .../ExecutionStateTests.cs | 11 +- .../RetryStrategyTests.cs | 202 ++++++++++++ 39 files changed, 1756 insertions(+), 120 deletions(-) create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/IRetryStrategy.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/AtMostOnceCrashReplayTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongRetryChainTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryExhaustionTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/AtMostOnceCrashFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/LongRetryChainFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/RetryExhaustionFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs index 0f724b4a2..bb95c8476 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs @@ -47,3 +47,23 @@ public StepException(string message) : base(message) { } /// Creates a wrapping an inner exception. public StepException(string message, Exception innerException) : base(message, innerException) { } } + +/// +/// Thrown when a step under is +/// detected to have been interrupted mid-execution on a prior invocation +/// (replay sees a STARTED checkpoint with no terminal record). +/// +/// +/// Surfaces in so user-supplied +/// strategies can distinguish "my code threw" from "a previous attempt +/// crashed before it could record a result". +/// +public class StepInterruptedException : StepException +{ + /// Creates an empty . + public StepInterruptedException() { } + /// Creates a with the given message. + public StepInterruptedException(string message) : base(message) { } + /// Creates a wrapping an inner exception. + public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs index 85ee73040..e0d03747c 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs @@ -155,8 +155,7 @@ private static async Task WrapAsyncCoreGetExecutionStateAsync) propagate as /// too, but they are NOT caught here — they - /// flow up to the host so Lambda retries, matching Python's GetExecutionStateError - /// (which extends InvocationError). + /// flow up to the host so Lambda retries. /// /// User-code SDK errors (e.g. an SDK call inside a Step body) are caught by /// StepRunner and surfaced as StepException for the workflow's normal diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IRetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IRetryStrategy.cs new file mode 100644 index 000000000..f291bed1e --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IRetryStrategy.cs @@ -0,0 +1,39 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Determines whether a failed step should be retried and with what delay. +/// +public interface IRetryStrategy +{ + /// + /// Evaluates whether the given exception warrants a retry. + /// + /// The exception that caused the step to fail. + /// The 1-based attempt number that just failed. + /// A decision indicating whether to retry and the delay before the next attempt. + RetryDecision ShouldRetry(Exception exception, int attemptNumber); +} + +/// +/// The outcome of a retry evaluation. +/// +public readonly struct RetryDecision +{ + /// Whether the step should be retried. + public bool ShouldRetry { get; } + + /// The delay before the next retry attempt. + public TimeSpan Delay { get; } + + private RetryDecision(bool shouldRetry, TimeSpan delay) + { + ShouldRetry = shouldRetry; + Delay = delay; + } + + /// Indicates the step should not be retried. + public static RetryDecision DoNotRetry() => new(false, TimeSpan.Zero); + + /// Indicates the step should be retried after the specified delay. + public static RetryDecision RetryAfter(TimeSpan delay) => new(true, delay); +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs index 8039e7c56..e098606f0 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs @@ -11,13 +11,12 @@ namespace Amazon.Lambda.DurableExecution.Internal; /// call awaits the flush of its containing batch (sync semantics). /// /// -/// TODO: when Map / Parallel / ChildContext / WaitForCondition land — or when -/// AtLeastOncePerRetry step START gets a non-blocking variant — they will need -/// a fire-and-forget overload like -/// Task EnqueueAsync(SdkOperationUpdate update, bool sync) where -/// sync=false returns as soon as the item is queued. Java's -/// sendOperationUpdate vs sendOperationUpdateAsync is the model. -/// Today every call site is sync, so the API stays minimal. +/// Fire-and-forget semantics are achieved by simply not awaiting the returned +/// Task. Errors still surface deterministically via _terminalError: the +/// next sync or rethrows. +/// Callers using fire-and-forget should observe the discarded Task's exception +/// (see StepOperation.FireAndForget) so it doesn't trip the runtime's +/// UnobservedTaskException event. /// internal sealed class CheckpointBatcher : IAsyncDisposable { diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs index a5e60b98e..2b7c0b2df 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs @@ -24,12 +24,10 @@ internal sealed class CheckpointBatcherConfig /// /// TODO: not enforced today. The worker only checks ; /// a single oversized item (or a batch whose serialized size exceeds 750 KB) - /// will be sent to the service and rejected there. Java/JS/Python all - /// pre-flight this on the in-flight batch and split before the next add. - /// Wire this in alongside the async-flush operations (Map / Parallel / - /// child-context) since those are the scenarios that can actually fill a - /// batch — today every batch is 1 item with - /// = Zero, so the gap is latent. + /// will be sent to the service and rejected there. Wire this in alongside + /// the async-flush operations (Map / Parallel / child-context) since those + /// are the scenarios that can actually fill a batch — today every batch is + /// 1 item with = Zero, so the gap is latent. /// internal int MaxBatchBytes { get; init; } = 750 * 1024; } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs index f936d3d24..ef27691a3 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs @@ -6,7 +6,6 @@ namespace Amazon.Lambda.DurableExecution.Internal; /// ; this type is the inbound side only. /// /// -/// Replay tracking mirrors the Python / Java / JavaScript reference SDKs: /// /// At construction the workflow is "replaying" if and only if any user-replayable /// op is present. The service always sends one EXECUTION-type op @@ -41,11 +40,15 @@ public void LoadFromCheckpoint(InitialExecutionState? initialState) AddOperations(initialState.Operations); } - // Only user-replayable ops put us into replay mode. The service-side - // EXECUTION op (input payload bookkeeping) is always present and must - // not count — see Python execution.py:258 / Java ExecutionManager:81 / - // JS execution-context.ts:62 for the same rule. - (_isReplaying, _remainingReplayOps) = ScanReplayable(); + // We're "replaying" when there are completed ops (SUCCEEDED, FAILED, + // CANCELLED, STOPPED) we need to re-derive before resuming live work. + // The service-side EXECUTION op (input payload bookkeeping) is always + // present and doesn't count. If the only ops are in-progress + // (READY/PENDING/STARTED), there's nothing to re-derive — the next + // user call IS the next thing to run — so IsReplaying starts false. + var (_, terminalCount) = ScanReplayable(); + _remainingReplayOps = terminalCount; + _isReplaying = terminalCount > 0; } public void AddOperations(IEnumerable operations) @@ -91,8 +94,11 @@ public void TrackReplay(string operationId) public void ValidateReplayConsistency(string operationId, string expectedType, string? expectedName) { - if (!_isReplaying) return; - + // Independent of IsReplaying: as long as a checkpoint record exists + // for this id, its type/name must match what user code is asking for. + // If the only checkpointed ops are in-progress (PENDING/READY/STARTED), + // IsReplaying is false but the records still exist and code drift can + // still produce a mismatch. if (!_operations.TryGetValue(operationId, out var op)) return; if (op.Type != null && op.Type != expectedType) diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/OperationIdGenerator.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/OperationIdGenerator.cs index 4e9527d3c..4dd03d328 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/OperationIdGenerator.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/OperationIdGenerator.cs @@ -8,11 +8,10 @@ namespace Amazon.Lambda.DurableExecution.Internal; /// /// Generates deterministic operation IDs for durable operations. Each call /// increments an internal counter and SHA-256 hashes "<parentId>-<counter>" -/// (or just "<counter>" at the root). Hashing matches the wire format -/// used by the Java/JS/Python SDKs so the same workflow position produces a -/// stable, opaque ID across replays — and the human-readable step name is -/// carried separately on OperationUpdate.Name, so renaming a step does -/// not break replay correlation. +/// (or just "<counter>" at the root). The same workflow position +/// produces a stable, opaque ID across replays — and the human-readable step +/// name is carried separately on OperationUpdate.Name, so renaming a +/// step does not break replay correlation. /// internal sealed class OperationIdGenerator { @@ -46,7 +45,7 @@ public OperationIdGenerator(string? parentId) /// /// Generates the next operation ID. The counter is pre-incremented so the - /// first ID is hash("1"), matching the reference SDKs. + /// first ID is hash("1"). /// /// /// Uses so concurrent callers @@ -55,7 +54,7 @@ public OperationIdGenerator(string? parentId) /// MapAsync branches that fan out before awaiting) cannot collide /// on the same ID. Determinism still requires that calls happen in a /// deterministic order — atomicity prevents duplicate IDs but not - /// reordering between replays. Matches Java's AtomicInteger.incrementAndGet. + /// reordering between replays. /// public string NextId() { diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs index 42c9e3461..ffdaaffd7 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs @@ -5,20 +5,27 @@ using Microsoft.Extensions.Logging; using SdkErrorObject = Amazon.Lambda.Model.ErrorObject; using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; +using SdkStepOptions = Amazon.Lambda.Model.StepOptions; namespace Amazon.Lambda.DurableExecution.Internal; /// -/// Durable step operation. Runs the user's function once across the lifetime -/// of a durable execution, persisting its result so subsequent invocations -/// replay the cached value without re-executing. +/// Durable step operation. Runs the user's function (with retry support), +/// persisting its result so subsequent invocations replay the cached value +/// without re-executing. /// /// -/// Replay semantics — example: await ctx.StepAsync(ChargeCard, "charge") +/// Replay branches — example: await ctx.StepAsync(ChargeCard, "charge") /// -/// Fresh: no prior state → run func → emit SUCCEED → return result. -/// Replay (SUCCEEDED): return cached result; func is NOT re-executed. -/// Replay (FAILED): re-throw the recorded exception. +/// Fresh: no prior state → run func → emit SUCCEED → return. +/// SUCCEEDED: return cached result; func is NOT re-executed. +/// FAILED: re-throw the recorded exception. +/// PENDING (retry timer not yet fired): re-suspend without +/// running func; service re-invokes once NextAttemptTimestamp elapses. +/// STARTED + AtMostOncePerRetry: crash recovery — treat as a +/// failed attempt, route through retry strategy. +/// READY: service has post-PENDING re-invoked us; the retry +/// timer fired and the next attempt is up. Run it. /// /// Serialization is delegated to the registered on /// . AOT-safe and reflection-based callers @@ -55,7 +62,7 @@ public StepOperation( protected override string OperationType => OperationTypes.Step; protected override Task StartAsync(CancellationToken cancellationToken) - => ExecuteFunc(cancellationToken); + => ExecuteFunc(attemptNumber: 1, cancellationToken); protected override Task ReplayAsync(Operation existing, CancellationToken cancellationToken) { @@ -71,31 +78,126 @@ protected override Task ReplayAsync(Operation existing, CancellationToken can // user's catch-block flow matches the original execution. throw CreateStepException(existing); + case OperationStatuses.Pending: + return ReplayPending(existing, cancellationToken); + + case OperationStatuses.Started: + return ReplayStarted(existing, cancellationToken); + + case OperationStatuses.Ready: + return ReplayReady(existing, cancellationToken); + default: - // STARTED/READY/PENDING from a prior invocation — no retry logic - // in this commit, so fall through and execute fresh. (Future work - // on retries will replace this default with explicit arms.) - return ExecuteFunc(cancellationToken); + // CANCELLED / STOPPED / unrecognized status. Re-running the + // step would re-execute side effects and silently mask a + // service-state we don't know how to interpret. Fail loud. + throw new NonDeterministicExecutionException( + $"Step operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay."); } } - private async Task ExecuteFunc(CancellationToken cancellationToken) + /// + /// READY means the service has post-PENDING re-invoked us — the retry + /// timer fired and the step is eligible to run its next attempt. No + /// timer check is needed (the service has already decided we're up); + /// just advance the attempt counter and execute. + /// + private Task ReplayReady(Operation ready, CancellationToken cancellationToken) + { + var attemptNumber = (ready.StepDetails?.Attempt ?? 0) + 1; + return ExecuteFunc(attemptNumber, cancellationToken); + } + + /// + /// PENDING means a retry was scheduled (RETRY checkpoint). If + /// NextAttemptTimestamp is in the future, re-suspend; otherwise the timer + /// has fired and we run the next attempt. + /// + private Task ReplayPending(Operation pending, CancellationToken cancellationToken) + { + var nextAttemptTs = pending.StepDetails?.NextAttemptTimestamp; + var attemptNumber = (pending.StepDetails?.Attempt ?? 0) + 1; + + if (nextAttemptTs is { } scheduledMs && + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() < scheduledMs) + { + // Retry timer hasn't fired yet — re-suspend so we don't bill compute + // while the timer ticks. Service re-invokes once the timer elapses. + return Termination.SuspendAndAwait( + TerminationReason.RetryScheduled, $"retry:{Name ?? OperationId}"); + } + + return ExecuteFunc(attemptNumber, cancellationToken); + } + + /// + /// STARTED means a START checkpoint was written but no SUCCEED/FAIL exists. + /// For AtMostOncePerRetry this signals a crash mid-step — treat as failure + /// and route through retry. For AtLeastOncePerRetry just re-execute. + /// + private Task ReplayStarted(Operation started, CancellationToken cancellationToken) + { + var attemptNumber = (started.StepDetails?.Attempt ?? 0) + 1; + + if (_config?.Semantics == StepSemantics.AtMostOncePerRetry) + { + // Re-running func would risk a duplicate side effect (e.g. double + // charge). Treat the lost result as a failure; let the retry + // strategy decide whether to try again or give up. + // + // Surface as StepInterruptedException so user strategies can + // distinguish "my code threw" from "a prior attempt crashed before + // recording a terminal record". + var error = started.StepDetails?.Error; + var ex = error != null + ? new StepInterruptedException(error.ErrorMessage ?? "Step failed on previous attempt") { ErrorType = error.ErrorType } + : new StepInterruptedException("Step result lost during AtMostOncePerRetry replay"); + return HandleStepFailureAsync(ex, attemptNumber, cancellationToken); + } + + return ExecuteFunc(attemptNumber, cancellationToken); + } + + private async Task ExecuteFunc(int attemptNumber, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); - // TODO: emit a STEP_STARTED checkpoint (action = "START") here when retries - // and/or AtMostOncePerRetry semantics land. AtMostOncePerRetry needs the - // START to be sync-flushed before user code runs (so replay can detect - // "we already attempted this and must not re-run"). AtLeastOncePerRetry - // wants it fire-and-forget for telemetry (attempt timing, retry count in - // history). Both require the async-flush overload in CheckpointBatcher - // (see TODO in CheckpointBatcher.cs). Today neither feature is wired up, - // so the START is intentionally omitted — SUCCEED alone is sufficient - // for replay correctness in the AtLeastOncePerRetry-only world this PR - // ships. Java SDK precedent: StepOperation.checkpointStarted(). + // Emit a START checkpoint before running user code, unless we're already + // resuming a STARTED record (which means an earlier attempt wrote it). + // + // AtMostOncePerRetry: SYNC flush. If Lambda crashes before SUCCEED is + // flushed, ReplayStarted routes through retry instead of re-executing. + // A queued-but-unflushed START is indistinguishable from "never ran" if + // we die, so the sync flush is correctness-load-bearing here. + // + // AtLeastOncePerRetry (default): FIRE-AND-FORGET. Replay correctness + // doesn't depend on the START — SUCCEED alone is sufficient — so this + // is purely telemetry (attempt timing, retry count visible in history). + if (State.GetOperation(OperationId)?.Status != OperationStatuses.Started) + { + var startUpdate = new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = OperationAction.START, + SubType = OperationSubTypes.Step, + Name = Name + }; + + if (_config?.Semantics == StepSemantics.AtMostOncePerRetry) + { + await EnqueueAsync(startUpdate, cancellationToken); + } + else + { + FireAndForget(EnqueueAsync(startUpdate, cancellationToken)); + } + } + + try { - var stepContext = new StepContext(OperationId, attemptNumber: 1, _logger); + var stepContext = new StepContext(OperationId, attemptNumber, _logger); var result = await _func(stepContext); await EnqueueAsync(new SdkOperationUpdate @@ -103,7 +205,7 @@ await EnqueueAsync(new SdkOperationUpdate Id = OperationId, Type = OperationTypes.Step, Action = OperationAction.SUCCEED, - SubType = "Step", + SubType = OperationSubTypes.Step, Name = Name, Payload = SerializeResult(result) }, cancellationToken); @@ -116,24 +218,64 @@ await EnqueueAsync(new SdkOperationUpdate } catch (Exception ex) { - // No retry logic in this commit: any thrown exception becomes a - // FAIL checkpoint and is re-thrown as a StepException. On replay, - // the FAILED branch above will re-throw without re-executing. - await EnqueueAsync(new SdkOperationUpdate - { - Id = OperationId, - Type = OperationTypes.Step, - Action = OperationAction.FAIL, - SubType = "Step", - Name = Name, - Error = ToSdkError(ex) - }, cancellationToken); + // Funnel into the retry/fail decision tree. May checkpoint RETRY and + // suspend (Pending), or checkpoint FAIL and rethrow to user. + return await HandleStepFailureAsync(ex, attemptNumber, cancellationToken); + } + } - throw new StepException(ex.Message, ex) + /// + /// Funnels a step failure into the retry/fail decision. May checkpoint + /// RETRY and suspend (Pending), or checkpoint FAIL and rethrow. + /// + private async Task HandleStepFailureAsync(Exception ex, int attemptNumber, CancellationToken cancellationToken) + { + var retryStrategy = _config?.RetryStrategy; + if (retryStrategy != null) + { + var decision = retryStrategy.ShouldRetry(ex, attemptNumber); + if (decision.ShouldRetry) { - ErrorType = ex.GetType().FullName - }; + // Service requires NextAttemptDelaySeconds >= 1. Built-in + // strategies already produce >=1s delays; this guard only + // matters for user-supplied IRetryStrategy / FromDelegate. + var requestedSeconds = decision.Delay.TotalSeconds; + var delaySeconds = (int)Math.Max(1, Math.Ceiling(requestedSeconds)); + if (requestedSeconds < 1) + { + _logger.LogWarning( + "Retry delay for step '{StepName}' attempt {Attempt} was {Requested:F3}s (< 1s); coerced to {Coerced}s.", + Name ?? OperationId, attemptNumber, requestedSeconds, delaySeconds); + } + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = OperationAction.RETRY, + SubType = OperationSubTypes.Step, + Name = Name, + Error = ToSdkError(ex), + StepOptions = new SdkStepOptions { NextAttemptDelaySeconds = delaySeconds } + }, cancellationToken); + return await Termination.SuspendAndAwait( + TerminationReason.RetryScheduled, $"retry:{Name ?? OperationId}"); + } } + + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = OperationAction.FAIL, + SubType = OperationSubTypes.Step, + Name = Name, + Error = ToSdkError(ex) + }, cancellationToken); + + throw new StepException(ex.Message, ex) + { + ErrorType = ex.GetType().FullName + }; } private T DeserializeResult(string? serialized) @@ -168,4 +310,20 @@ private static StepException CreateStepException(Operation failedOp) ErrorMessage = ex.Message, StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList() }; + + /// + /// Discards a Task but observes any exception so it doesn't surface as an + /// UnobservedTaskException. Used for fire-and-forget START checkpoints + /// under AtLeastOncePerRetry semantics. The actual error still propagates + /// via CheckpointBatcher._terminalError: the next sync EnqueueAsync + /// or DrainAsync will rethrow with the original cause. + /// + private static void FireAndForget(Task task) + { + _ = task.ContinueWith( + static t => _ = t.Exception, + CancellationToken.None, + TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs index 1350c3d70..5d61e611b 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs @@ -6,6 +6,7 @@ namespace Amazon.Lambda.DurableExecution.Internal; internal enum TerminationReason { WaitScheduled, + RetryScheduled, CallbackPending, InvokePending, CheckpointFailed diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs index e8351c120..2c1325974 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs @@ -49,7 +49,7 @@ await EnqueueAsync(new SdkOperationUpdate Id = OperationId, Type = OperationTypes.Wait, Action = OperationAction.START, - SubType = "Wait", + SubType = OperationSubTypes.Wait, Name = Name, WaitOptions = new SdkWaitOptions { WaitSeconds = _waitSeconds } }, cancellationToken); diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs index 7237eef87..95b6a35b8 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs @@ -167,6 +167,30 @@ public static class OperationTypes public const string Execution = "EXECUTION"; } +/// +/// Wire-format string constants. SubType is a +/// finer-grained classifier sent alongside for +/// observability — the values are PascalCase ("Step", "Wait") and distinct +/// from the uppercase values. +/// +public static class OperationSubTypes +{ + /// Step sub-type. + public const string Step = "Step"; + + /// Wait sub-type. + public const string Wait = "Wait"; + + /// Callback sub-type. + public const string Callback = "Callback"; + + /// Chained-invoke sub-type. + public const string ChainedInvoke = "ChainedInvoke"; + + /// Child-context sub-type. + public const string Context = "Context"; +} + /// /// Wire-format string constants. /// Plural name avoids collision with Amazon.Lambda.OperationStatus. diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs new file mode 100644 index 000000000..9cb6edbde --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs @@ -0,0 +1,201 @@ +using System.Text.RegularExpressions; + +namespace Amazon.Lambda.DurableExecution; + +/// +/// Jitter strategy for exponential backoff to prevent thundering-herd scenarios. +/// +public enum JitterStrategy +{ + /// No randomization — delay is exactly the calculated backoff value. + None, + /// Random delay between 0 and the calculated backoff value (recommended). + Full, + /// Random delay between 50% and 100% of the calculated backoff value. + Half +} + +/// +/// Controls whether a step re-executes if the Lambda is re-invoked mid-attempt. +/// +public enum StepSemantics +{ + /// + /// Default. The step may re-execute if the Lambda is re-invoked during execution. + /// Use for idempotent operations. + /// + AtLeastOncePerRetry, + + /// + /// The step executes at most once per retry attempt. A START checkpoint is written + /// before execution; on replay with an existing START, the SDK skips re-execution + /// and proceeds to the retry handler. + /// + AtMostOncePerRetry +} + +/// +/// Factory methods for common retry strategies. +/// +public static class RetryStrategy +{ + /// 6 attempts, 2x backoff, 5s initial delay, 60s max, Full jitter. + public static IRetryStrategy Default { get; } = Exponential( + maxAttempts: 6, + initialDelay: TimeSpan.FromSeconds(5), + maxDelay: TimeSpan.FromSeconds(60), + backoffRate: 2.0, + jitter: JitterStrategy.Full); + + /// 3 attempts, 2x backoff, 1s initial delay, 5s max, Half jitter. + public static IRetryStrategy Transient { get; } = Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromSeconds(1), + maxDelay: TimeSpan.FromSeconds(5), + backoffRate: 2.0, + jitter: JitterStrategy.Half); + + /// No retry — 1 attempt only. + public static IRetryStrategy None { get; } = Exponential(maxAttempts: 1); + + /// + /// Creates an exponential backoff retry strategy. + /// + /// + /// Thrown if < 1, < 1, + /// is non-positive, is non-positive, + /// or > . + /// + public static IRetryStrategy Exponential( + int maxAttempts = 3, + TimeSpan? initialDelay = null, + TimeSpan? maxDelay = null, + double backoffRate = 2.0, + JitterStrategy jitter = JitterStrategy.Full, + Type[]? retryableExceptions = null, + string[]? retryableMessagePatterns = null) + { + return new ExponentialRetryStrategy( + maxAttempts, + initialDelay ?? TimeSpan.FromSeconds(5), + maxDelay ?? TimeSpan.FromSeconds(300), + backoffRate, + jitter, + retryableExceptions, + retryableMessagePatterns); + } + + /// + /// Creates a retry strategy from a delegate. + /// + /// Thrown if is null. + public static IRetryStrategy FromDelegate(Func strategy) + { + if (strategy == null) throw new ArgumentNullException(nameof(strategy)); + return new DelegateRetryStrategy(strategy); + } +} + +internal sealed class ExponentialRetryStrategy : IRetryStrategy +{ + private readonly int _maxAttempts; + private readonly TimeSpan _initialDelay; + private readonly TimeSpan _maxDelay; + private readonly double _backoffRate; + private readonly JitterStrategy _jitter; + private readonly Type[]? _retryableExceptions; + private readonly Regex[]? _retryableMessagePatterns; + + public ExponentialRetryStrategy( + int maxAttempts, + TimeSpan initialDelay, + TimeSpan maxDelay, + double backoffRate, + JitterStrategy jitter, + Type[]? retryableExceptions, + string[]? retryableMessagePatterns) + { + if (maxAttempts < 1) + throw new ArgumentOutOfRangeException(nameof(maxAttempts), maxAttempts, "must be >= 1"); + if (initialDelay <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(initialDelay), initialDelay, "must be > 0"); + if (maxDelay <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(maxDelay), maxDelay, "must be > 0"); + if (initialDelay > maxDelay) + throw new ArgumentOutOfRangeException(nameof(initialDelay), initialDelay, $"must be <= maxDelay ({maxDelay})"); + if (backoffRate < 1.0 || double.IsNaN(backoffRate) || double.IsInfinity(backoffRate)) + throw new ArgumentOutOfRangeException(nameof(backoffRate), backoffRate, "must be a finite value >= 1.0"); + + _maxAttempts = maxAttempts; + _initialDelay = initialDelay; + _maxDelay = maxDelay; + _backoffRate = backoffRate; + _jitter = jitter; + _retryableExceptions = retryableExceptions; + _retryableMessagePatterns = retryableMessagePatterns? + .Select(p => new Regex(p, RegexOptions.Compiled)) + .ToArray(); + } + + public RetryDecision ShouldRetry(Exception exception, int attemptNumber) + { + if (attemptNumber >= _maxAttempts) + return RetryDecision.DoNotRetry(); + + if (!IsRetryable(exception)) + return RetryDecision.DoNotRetry(); + + var delay = CalculateDelay(attemptNumber); + return RetryDecision.RetryAfter(delay); + } + + private bool IsRetryable(Exception exception) + { + if (_retryableExceptions == null && _retryableMessagePatterns == null) + return true; + + if (_retryableExceptions != null) + { + var exType = exception.GetType(); + if (_retryableExceptions.Any(t => t.IsAssignableFrom(exType))) + return true; + } + + if (_retryableMessagePatterns != null) + { + var message = exception.Message; + if (_retryableMessagePatterns.Any(p => p.IsMatch(message))) + return true; + } + + return false; + } + + internal TimeSpan CalculateDelay(int attemptNumber) + { + var baseDelay = _initialDelay.TotalSeconds * Math.Pow(_backoffRate, attemptNumber - 1); + var cappedDelay = Math.Min(baseDelay, _maxDelay.TotalSeconds); + + var finalDelay = _jitter switch + { + JitterStrategy.Full => Random.Shared.NextDouble() * cappedDelay, + JitterStrategy.Half => cappedDelay * (0.5 + 0.5 * Random.Shared.NextDouble()), + _ => cappedDelay + }; + + return TimeSpan.FromSeconds(Math.Max(1, Math.Ceiling(finalDelay))); + } +} + +internal sealed class DelegateRetryStrategy : IRetryStrategy +{ + private readonly Func _strategy; + + public DelegateRetryStrategy(Func strategy) + { + _strategy = strategy ?? throw new ArgumentNullException(nameof(strategy)); + } + + public RetryDecision ShouldRetry(Exception exception, int attemptNumber) + => _strategy(exception, attemptNumber); +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/StepConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/StepConfig.cs index 2380967de..362867c09 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/StepConfig.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/StepConfig.cs @@ -5,9 +5,14 @@ namespace Amazon.Lambda.DurableExecution; /// public sealed class StepConfig { - // TODO: Retry support is deferred to a follow-up PR. When added, this is - // where RetryStrategy and Semantics (AtLeastOncePerRetry / AtMostOncePerRetry) - // will live. The follow-up needs to use service-mediated retries (checkpoint - // a RETRY operation + suspend the Lambda) rather than an in-process Task.Delay - // loop, to avoid billing Lambda compute time during retry backoff. + /// + /// Retry strategy for failed steps. When null (default), failures are not retried. + /// + public IRetryStrategy? RetryStrategy { get; set; } + + /// + /// Controls whether a step may re-execute if the Lambda is re-invoked mid-attempt. + /// Default is . + /// + public StepSemantics Semantics { get; set; } = StepSemantics.AtLeastOncePerRetry; } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/AtMostOnceCrashReplayTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/AtMostOnceCrashReplayTest.cs new file mode 100644 index 000000000..7bae660f0 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/AtMostOnceCrashReplayTest.cs @@ -0,0 +1,81 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class AtMostOnceCrashReplayTest +{ + private readonly ITestOutputHelper _output; + public AtMostOnceCrashReplayTest(ITestOutputHelper output) => _output = output; + + /// + /// Validates the AtMostOncePerRetry crash-recovery wire path: the Lambda + /// process is killed mid-step on attempt 1 (after START flush, before + /// SUCCEED). On re-invocation the SDK sees a STARTED checkpoint with no + /// terminal record and routes through the retry strategy rather than + /// re-executing the step. Attempt 2 succeeds. + /// + /// This is the only path that exercises the StepInterruptedException + /// synthesis — the unit-test analogue + /// (StepAsync_AtMostOnce_StartedReplay_TriggersRetryHandler) fakes the + /// STARTED state in-memory and never proves the service actually delivers + /// it on a real crash. + /// + [Fact] + public async Task AtMostOnce_StepCrashesMidExecution_RecoversViaRetry() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("AtMostOnceCrashFunction"), + "amocrash", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // 2s retry delay + initial-attempt cold-start + recovery invoke. Generous headroom. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.StepSucceededDetails != null && e.Name == "crash_then_recover") ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Attempt 1 was crashed (no SUCCEED), attempt 2 recovered. + // We expect exactly one StepSucceeded carrying "recovered on attempt 2". + var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "crash_then_recover"); + Assert.NotNull(succeeded); + Assert.Equal("\"recovered on attempt 2\"", succeeded!.StepSucceededDetails.Result?.Payload); + + // Two StepStarted events: one per invocation. + Assert.True( + events.Count(e => e.EventType == EventType.StepStarted) >= 2, + "Expected at least 2 StepStarted events (attempt 1 crashed, attempt 2 recovered)."); + + // The crash-recovery branch records the synthesized StepInterruptedException + // as a StepFailed event for attempt 1, with a message identifying the lost + // attempt rather than a user exception type. + var failures = events + .Where(e => e.StepFailedDetails != null && e.Name == "crash_then_recover") + .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty) + .ToList(); + Assert.NotEmpty(failures); + Assert.Contains(failures, m => m.Contains("Step result lost", StringComparison.OrdinalIgnoreCase) + || m.Contains("interrupted", StringComparison.OrdinalIgnoreCase) + || m.Contains("previous attempt", StringComparison.OrdinalIgnoreCase)); + + // The execution actually crossed at least one invocation boundary + // (otherwise replay wasn't exercised at all). + var invocations = events.Where(e => e.InvocationCompletedDetails != null).ToList(); + Assert.True( + invocations.Count >= 2, + $"Expected at least 2 InvocationCompleted events (proves crash + replay), got {invocations.Count}"); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs index 8b5bb2e1b..b2ba4bb1a 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs @@ -263,8 +263,16 @@ public async Task WaitForHistoryAsync( { last = await GetHistoryAsync(durableExecutionArn, includeExecutionData); var eventCount = last.Events?.Count ?? 0; - _output.WriteLine($"[WaitForHistory] attempt {attempt}: {eventCount} events"); - if (predicate(last)) return last; + var typeCounts = last.Events? + .GroupBy(e => e.EventType?.Value ?? "") + .Select(g => $"{g.Key}:{g.Count()}") + .OrderBy(s => s); + _output.WriteLine($"[WaitForHistory] attempt {attempt}: {eventCount} events [{string.Join(",", typeCounts ?? Enumerable.Empty())}]"); + if (predicate(last)) + { + DumpEvents(last); + return last; + } } catch (Exception ex) { @@ -274,9 +282,21 @@ public async Task WaitForHistoryAsync( } _output.WriteLine($"[WaitForHistory] gave up after {attempt} attempts; returning last response with {last?.Events?.Count ?? 0} events"); + if (last != null) DumpEvents(last); return last ?? throw new TimeoutException($"GetDurableExecutionHistory never succeeded within {timeout.TotalSeconds}s"); } + private void DumpEvents(GetDurableExecutionHistoryResponse history) + { + var events = history.Events ?? new List(); + _output.WriteLine($"[WaitForHistory] event dump ({events.Count} total):"); + for (int i = 0; i < events.Count; i++) + { + var e = events[i]; + _output.WriteLine($" [{i}] type={e.EventType?.Value ?? ""} name={e.Name ?? ""} ts={e.EventTimestamp:O}"); + } + } + public string? ExtractDurableExecutionArn(string responsePayload) { try @@ -375,14 +395,18 @@ await Task.WhenAny( var stdout = await stdoutTask; var stderr = await stderrTask; - if (!string.IsNullOrWhiteSpace(stdout)) - _output.WriteLine($"stdout: {stdout[..Math.Min(stdout.Length, 1000)]}"); - if (process.ExitCode != 0) { + // Dump the FULL streams on failure — diagnosing build errors with + // truncated output is painful, and these only fire on test failure. + _output.WriteLine($"stdout: {stdout}"); _output.WriteLine($"stderr: {stderr}"); - throw new Exception($"{fileName} failed (exit {process.ExitCode}): {stderr}"); + var detail = !string.IsNullOrWhiteSpace(stderr) ? stderr : stdout; + throw new Exception($"{fileName} failed (exit {process.ExitCode}): {detail}"); } + + if (!string.IsNullOrWhiteSpace(stdout)) + _output.WriteLine($"stdout: {stdout[..Math.Min(stdout.Length, 1000)]}"); } public async ValueTask DisposeAsync() diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongRetryChainTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongRetryChainTest.cs new file mode 100644 index 000000000..105689ecc --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongRetryChainTest.cs @@ -0,0 +1,77 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class LongRetryChainTest +{ + private readonly ITestOutputHelper _output; + public LongRetryChainTest(ITestOutputHelper output) => _output = output; + + /// + /// Long retry chain across many invocations: step fails 5 times before + /// succeeding on attempt 6. Validates that StepDetails.Attempt increments + /// monotonically across invocations (no off-by-one, no skipped attempts) + /// and that IStepContext.AttemptNumber on the user side matches the wire + /// value on each attempt. + /// + [Fact] + public async Task FailsFiveTimesThenSucceeds_AttemptCounterIsMonotonic() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("LongRetryChainFunction"), + "longretry", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // Total retry delay budget: 1+2+3+4+5 = 15s. Allow generous headroom. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(180)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 6 + && (h.Events?.Any(e => e.StepSucceededDetails != null) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Six attempts total: five failures + one success. + Assert.Equal(6, events.Count(e => e.EventType == EventType.StepStarted)); + Assert.Equal(5, events.Count(e => e.StepFailedDetails != null && e.Name == "long_retry_step")); + var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "long_retry_step"); + Assert.NotNull(succeeded); + + // The user-facing AttemptNumber on the final (winning) attempt was 6 — + // proves IStepContext.AttemptNumber tracks the wire attempt counter + // across invocations, not just within a single invocation. + Assert.Equal("\"ok on attempt 6\"", succeeded!.StepSucceededDetails.Result?.Payload); + + // Each failure carries a unique per-attempt message — confirms the user-side + // counter incremented exactly once per invocation, no duplicates or skips. + var failureMessages = events + .Where(e => e.StepFailedDetails != null && e.Name == "long_retry_step") + .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty) + .ToList(); + Assert.Equal(5, failureMessages.Count); + for (int i = 1; i <= 5; i++) + { + Assert.Contains(failureMessages, m => m.Contains($"attempt {i}")); + } + + // The chain was executed across multiple invocations (proves the + // service actually re-invoked us between retries instead of holding + // a single Lambda alive through all six attempts). + var invocations = events.Where(e => e.InvocationCompletedDetails != null).ToList(); + Assert.True( + invocations.Count >= 5, + $"Expected at least 5 InvocationCompleted events (one per retry boundary), got {invocations.Count}"); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs index 0592d0d44..bfc2913ed 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs @@ -30,11 +30,14 @@ public async Task LongerWait_ExpiresAndCompletes() var history = await deployment.WaitForHistoryAsync( arn!, - h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2 + && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 && (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false), TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted)); + // Steps before and after the wait both ran, with the post-wait step seeing // the pre-wait step's value via replay. var stepResults = events diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs index 573ecc082..6b0ae0bc7 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs @@ -32,10 +32,13 @@ public async Task MultipleSteps_AllCheckpointed() // all events are indexed. Wait until we see all 5 step-succeeded events. var history = await deployment.WaitForHistoryAsync( arn!, - h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 5, + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 5 + && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 5, TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(5, events.Count(e => e.EventType == EventType.StepStarted)); + // Each step ran exactly once (no replay-induced duplicates) in declaration order, // and each step's output chained from the previous one. var stepResults = events diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs index 0fd7aa569..137bb28b8 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs @@ -31,10 +31,13 @@ public async Task ReplayDeterminism_SameGuidAcrossInvocations() // History is eventually consistent — wait until both step-succeeded events are visible. var history = await deployment.WaitForHistoryAsync( arn!, - h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2, + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2 + && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2, TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted)); + // Each step succeeded exactly once — generate_id was NOT re-executed on replay // (a duplicate would show up as two succeeded events for the same name). var stepSucceededEvents = events.Where(e => e.StepSucceededDetails != null).ToList(); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryExhaustionTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryExhaustionTest.cs new file mode 100644 index 000000000..c8ab4d85b --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryExhaustionTest.cs @@ -0,0 +1,80 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class RetryExhaustionTest +{ + private readonly ITestOutputHelper _output; + public RetryExhaustionTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end retry exhaustion: step always throws, maxAttempts=3. + /// Validates that the SDK records exactly three StepStarted/StepFailed pairs, + /// the final attempt produces a FAIL checkpoint (not RETRY), and the workflow + /// terminates FAILED with the original exception surfaced through the + /// execution-level error. + /// + [Fact] + public async Task AlwaysFailsStep_ExhaustsRetries_TerminatesFailed() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("RetryExhaustionFunction"), + "rexhaust", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + // Failed workflows return null payload synchronously; locate the execution by name. + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // 2s + 4s of retry delays + 3x execution overhead. Generous headroom for scheduling. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("FAILED", status, ignoreCase: true); + + // Execution-level error is the original exception from the final attempt. + var execution = await deployment.GetExecutionAsync(arn!); + Assert.NotNull(execution.Error); + Assert.Contains("attempt 3", execution.Error.ErrorMessage); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 3 + && (h.Events?.Count(e => e.StepFailedDetails != null) ?? 0) >= 3, + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Three attempts ran in total — no extra (off-by-one) and no truncation. + Assert.Equal(3, events.Count(e => e.EventType == EventType.StepStarted)); + + // Three failures recorded; no successes. + Assert.Equal(3, events.Count(e => e.StepFailedDetails != null && e.Name == "always_fails_step")); + Assert.Empty(events.Where(e => e.StepSucceededDetails != null)); + + // Each recorded failure carries the right per-attempt message. + var failures = events + .Where(e => e.StepFailedDetails != null && e.Name == "always_fails_step") + .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty) + .ToList(); + Assert.Contains(failures, m => m.Contains("attempt 1")); + Assert.Contains(failures, m => m.Contains("attempt 2")); + Assert.Contains(failures, m => m.Contains("attempt 3")); + + // Service honored the retry delays. No-jitter exponential backoff at 2s/4s + // means the gap between the first and last StepStarted is >= 6s. + var startedTimestamps = events + .Where(e => e.EventType == EventType.StepStarted && e.EventTimestamp.HasValue) + .OrderBy(e => e.EventTimestamp!.Value) + .Select(e => e.EventTimestamp!.Value) + .ToList(); + var totalGap = startedTimestamps[^1] - startedTimestamps[0]; + _output.WriteLine($"Time between first and last attempt: {totalGap.TotalSeconds:F1}s"); + Assert.True(totalGap >= TimeSpan.FromSeconds(6), + $"Service did not honor retry delays: {totalGap.TotalSeconds:F1}s gap (expected >= 6s)"); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs new file mode 100644 index 000000000..82be3d105 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs @@ -0,0 +1,78 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class RetryTest +{ + private readonly ITestOutputHelper _output; + public RetryTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end retry: step throws on attempts 1 and 2, succeeds on attempt 3. + /// Validates that the service honors the RETRY checkpoint, schedules the + /// requested delay, and re-invokes the Lambda — none of which the unit + /// tests can prove (they fake state transitions in-memory). + /// + [Fact] + public async Task FlakyStep_RetriesAndSucceedsOnThirdAttempt() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("RetryFunction"), + "retry", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + // Initial invoke returns when the SDK suspends after the first failure. + // The execution continues asynchronously via service-driven re-invokes. + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // Total expected wall time: 2s + 4s of retry delay + execution overhead. + // Allow generous headroom for service scheduling latency. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 3 + && (h.Events?.Any(e => e.StepSucceededDetails != null) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Three attempts ran (attempts 1, 2, 3). + Assert.Equal(3, events.Count(e => e.EventType == EventType.StepStarted)); + + // Two failed attempts recorded retry metadata; the final attempt succeeded. + Assert.Equal(2, events.Count(e => e.StepFailedDetails != null && e.Name == "flaky_step")); + var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "flaky_step"); + Assert.NotNull(succeeded); + Assert.Equal("\"ok on attempt 3\"", succeeded!.StepSucceededDetails.Result?.Payload); + + // The two recorded failure messages reflect the per-attempt exception. + var failures = events + .Where(e => e.StepFailedDetails != null && e.Name == "flaky_step") + .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty) + .ToList(); + Assert.Contains(failures, m => m.Contains("attempt 1")); + Assert.Contains(failures, m => m.Contains("attempt 2")); + + // Timing check: the service must have actually waited between attempts. + // With initialDelay=2s, backoffRate=2.0, no jitter: delays are 2s and 4s. + // The gap between the first and last StepStarted should be >= 6s. + var startedTimestamps = events + .Where(e => e.EventType == EventType.StepStarted && e.EventTimestamp.HasValue) + .OrderBy(e => e.EventTimestamp!.Value) + .Select(e => e.EventTimestamp!.Value) + .ToList(); + var totalGap = startedTimestamps[^1] - startedTimestamps[0]; + _output.WriteLine($"Time between first and last attempt: {totalGap.TotalSeconds:F1}s"); + Assert.True(totalGap >= TimeSpan.FromSeconds(6), + $"Service did not honor retry delays: {totalGap.TotalSeconds:F1}s gap (expected >= 6s)"); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs index 7b2afd427..b51e26b2d 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs @@ -36,10 +36,13 @@ public async Task StepFails_PropagatesAsFailedStatus() var history = await deployment.WaitForHistoryAsync( arn!, - h => h.Events?.Any(e => e.StepFailedDetails != null) ?? false, + h => (h.Events?.Any(e => e.EventType == EventType.StepStarted) ?? false) + && (h.Events?.Any(e => e.StepFailedDetails != null) ?? false), TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(1, events.Count(e => e.EventType == EventType.StepStarted)); + // The failing step recorded a StepFailed event with the exception message. var stepFailed = events.FirstOrDefault(e => e.StepFailedDetails != null && e.Name == "fail_step"); Assert.NotNull(stepFailed); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs index 684486dd9..05e2bfc72 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs @@ -32,11 +32,14 @@ public async Task StepWaitStep_CompletesViaService() var history = await deployment.WaitForHistoryAsync( arn!, - h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2 + && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 && (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false), TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted)); + // Both steps ran in order and produced the expected chained outputs. var stepResults = events .Where(e => e.StepSucceededDetails != null) diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/AtMostOnceCrashFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/AtMostOnceCrashFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/AtMostOnceCrashFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs new file mode 100644 index 000000000..0bfda43cd --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs @@ -0,0 +1,69 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +/// +/// Exercises the AtMostOncePerRetry crash-recovery path end-to-end. +/// +/// On attempt 1 the step kills the Lambda process AFTER the START checkpoint +/// has been flushed but BEFORE any SUCCEED checkpoint can be written. The +/// service re-invokes us; replay sees STARTED with no terminal record, so the +/// SDK routes through the retry strategy with a synthesized +/// StepInterruptedException. Attempt 2 succeeds normally. +/// +/// The per-attempt counter is read from the input payload — the durable +/// service preserves it across re-invokes so we can drive deterministic crash +/// behavior on attempt 1 only. +/// +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + var result = await context.StepAsync( + async (ctx) => + { + await Task.CompletedTask; + if (ctx.AttemptNumber == 1) + { + // Hard process exit AFTER the SDK has flushed the START + // checkpoint (sync flush is part of the AtMostOncePerRetry + // contract). The service will see a STARTED record with no + // terminal counterpart on the next invocation. + Environment.Exit(137); + } + return $"recovered on attempt {ctx.AttemptNumber}"; + }, + name: "crash_then_recover", + config: new StepConfig + { + Semantics = StepSemantics.AtMostOncePerRetry, + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromSeconds(2), + maxDelay: TimeSpan.FromSeconds(5), + backoffRate: 2.0, + jitter: JitterStrategy.None) + }); + + return new TestResult { Status = "completed", Data = result }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs new file mode 100644 index 000000000..dbb4b0d3b --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs @@ -0,0 +1,57 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +/// +/// Five-failure retry chain: the step throws on attempts 1-5 and succeeds on +/// attempt 6. The result payload echoes ctx.AttemptNumber on each attempt so +/// the integration test can verify the SDK's user-facing attempt counter +/// matches the wire-format StepDetails.Attempt value across multiple +/// invocations. +/// +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + var result = await context.StepAsync( + async (ctx) => + { + await Task.CompletedTask; + if (ctx.AttemptNumber < 6) + throw new InvalidOperationException($"flake on attempt {ctx.AttemptNumber}"); + return $"ok on attempt {ctx.AttemptNumber}"; + }, + name: "long_retry_step", + config: new StepConfig + { + // Short delays so the test wall time stays manageable: 1s, 2s, 3s, 4s, 5s. + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 6, + initialDelay: TimeSpan.FromSeconds(1), + maxDelay: TimeSpan.FromSeconds(5), + backoffRate: 1.5, + jitter: JitterStrategy.None) + }); + + return new TestResult { Status = "completed", Data = result }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/LongRetryChainFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/LongRetryChainFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/LongRetryChainFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs new file mode 100644 index 000000000..5a6551ec2 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs @@ -0,0 +1,47 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + var result = await context.StepAsync( + async (ctx) => + { + await Task.CompletedTask; + throw new InvalidOperationException($"always-fails attempt {ctx.AttemptNumber}"); + }, + name: "always_fails_step", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromSeconds(2), + maxDelay: TimeSpan.FromSeconds(10), + backoffRate: 2.0, + jitter: JitterStrategy.None) + }); + + return new TestResult { Status = "completed", Data = result }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/RetryExhaustionFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/RetryExhaustionFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/RetryExhaustionFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs new file mode 100644 index 000000000..9ebffdf11 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs @@ -0,0 +1,49 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + var result = await context.StepAsync( + async (ctx) => + { + await Task.CompletedTask; + if (ctx.AttemptNumber < 3) + throw new InvalidOperationException($"flake on attempt {ctx.AttemptNumber}"); + return $"ok on attempt {ctx.AttemptNumber}"; + }, + name: "flaky_step", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromSeconds(2), + maxDelay: TimeSpan.FromSeconds(10), + backoffRate: 2.0, + jitter: JitterStrategy.None) + }); + + return new TestResult { Status = "completed", Data = result }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs index 7f362d605..50e511cb1 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs @@ -620,4 +620,306 @@ private class TestPerson public string? Name { get; set; } public int Age { get; set; } } + + #region StepAsync Retry Tests + + [Fact] + public async Task StepAsync_FailsWithRetryStrategy_CheckpointsRetryAndSuspends() + { + var tm = new TerminationManager(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(null); + var idGen = new OperationIdGenerator(); + var lambdaContext = CreateLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + var stepTask = context.StepAsync( + async (_) => { await Task.CompletedTask; throw new InvalidOperationException("transient"); }, + name: "flaky_step", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromSeconds(5), + jitter: JitterStrategy.None) + }); + + await Task.Delay(50); + + Assert.True(tm.IsTerminated); + Assert.False(stepTask.IsCompleted); + + // Fresh attempt 1 emits a fire-and-forget START (telemetry under + // AtLeastOncePerRetry), then a RETRY when the user code throws and + // the retry strategy decides to retry. + var checkpoints = recorder.Flushed; + Assert.Equal(2, checkpoints.Count); + Assert.Equal("START", checkpoints[0].Action); + Assert.Equal("RETRY", checkpoints[1].Action); + Assert.Equal(IdAt(1), checkpoints[1].Id); + Assert.Equal(5, checkpoints[1].StepOptions.NextAttemptDelaySeconds); + } + + [Fact] + public async Task StepAsync_FailsNoRetryStrategy_CheckpointsFail() + { + var context = CreateContext(); + + var ex = await Assert.ThrowsAsync(() => + context.StepAsync( + async (_) => { await Task.CompletedTask; throw new InvalidOperationException("permanent"); }, + name: "fail_step")); + + Assert.Equal("permanent", ex.Message); + } + + [Fact] + public async Task StepAsync_RetryExhausted_CheckpointsFail() + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Pending, + StepDetails = new StepDetails + { + Attempt = 2, + NextAttemptTimestamp = DateTimeOffset.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds() + } + } + } + }); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = CreateLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + // Attempt 3 (last one) — should fail after this + var ex = await Assert.ThrowsAsync(() => + context.StepAsync( + async (_) => { await Task.CompletedTask; throw new InvalidOperationException("still failing"); }, + name: "exhaust_step", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3, jitter: JitterStrategy.None) + })); + + Assert.Equal("still failing", ex.Message); + + // Fresh attempt 3 emits a fire-and-forget START (telemetry under + // AtLeastOncePerRetry), then a FAIL after the retry strategy gives up. + var checkpoints = recorder.Flushed; + Assert.Equal(2, checkpoints.Count); + Assert.Equal("START", checkpoints[0].Action); + Assert.Equal("FAIL", checkpoints[1].Action); + } + + [Fact] + public async Task StepAsync_PendingWithFutureTimestamp_Suspends() + { + var futureMs = DateTimeOffset.UtcNow.AddSeconds(300).ToUnixTimeMilliseconds(); + var tm = new TerminationManager(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Pending, + StepDetails = new StepDetails + { + Attempt = 1, + NextAttemptTimestamp = futureMs + } + } + } + }); + var idGen = new OperationIdGenerator(); + var lambdaContext = CreateLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + var stepTask = context.StepAsync( + async (_) => { await Task.CompletedTask; return "should not run"; }, + name: "pending_step", + config: new StepConfig { RetryStrategy = RetryStrategy.Default }); + + await Task.Delay(50); + + Assert.True(tm.IsTerminated); + Assert.False(stepTask.IsCompleted); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task StepAsync_PendingWithPastTimestamp_ReExecutes() + { + var pastMs = DateTimeOffset.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Pending, + StepDetails = new StepDetails + { + Attempt = 1, + NextAttemptTimestamp = pastMs + } + } + } + }); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = CreateLambdaContext(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + + var result = await context.StepAsync( + async (ctx) => + { + await Task.CompletedTask; + Assert.Equal(2, ctx.AttemptNumber); + return "retry success"; + }, + name: "retry_step", + config: new StepConfig { RetryStrategy = RetryStrategy.Default }); + + Assert.Equal("retry success", result); + } + + [Fact] + public async Task StepAsync_ReadyReplay_AdvancesAttemptAndExecutes() + { + // READY = service has post-PENDING re-invoked us; the retry timer + // already fired so no timestamp check is needed. Just advance the + // attempt counter and run. + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Ready, + StepDetails = new StepDetails { Attempt = 2 } + } + } + }); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = CreateLambdaContext(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + + var executed = false; + var result = await context.StepAsync( + async (ctx) => + { + executed = true; + Assert.Equal(3, ctx.AttemptNumber); + await Task.CompletedTask; + return "ok"; + }, + name: "ready_step", + config: new StepConfig { RetryStrategy = RetryStrategy.Default }); + + Assert.True(executed); + Assert.Equal("ok", result); + Assert.False(tm.IsTerminated); + Assert.False(state.IsReplaying); + } + + [Fact] + public async Task StepAsync_AtMostOnce_FlushesStartBeforeExecution() + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(null); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = CreateLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + IReadOnlyList? flushedAtFuncEntry = null; + + var result = await context.StepAsync( + async (_) => + { + flushedAtFuncEntry = recorder.Flushed.Select(o => o.Action.ToString()).ToArray(); + await Task.CompletedTask; + return "done"; + }, + name: "amo_step", + config: new StepConfig { Semantics = StepSemantics.AtMostOncePerRetry }); + + Assert.Equal("done", result); + + // START must be flushed before user func runs (AtMostOnce invariant). + Assert.NotNull(flushedAtFuncEntry); + Assert.Equal(new[] { "START" }, flushedAtFuncEntry); + + // After step returns, SUCCEED has also been flushed. + var actions = recorder.Flushed.Select(o => o.Action.ToString()).ToArray(); + Assert.Equal(new[] { "START", "SUCCEED" }, actions); + } + + [Fact] + public async Task StepAsync_AtMostOnce_StartedReplay_TriggersRetryHandler() + { + var tm = new TerminationManager(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Started + } + } + }); + var idGen = new OperationIdGenerator(); + var lambdaContext = CreateLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + var executed = false; + var stepTask = context.StepAsync( + async (_) => { executed = true; await Task.CompletedTask; return "should not run"; }, + name: "amo_replay", + config: new StepConfig + { + Semantics = StepSemantics.AtMostOncePerRetry, + RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3, jitter: JitterStrategy.None) + }); + + await Task.Delay(50); + + Assert.False(executed); + Assert.True(tm.IsTerminated); + Assert.False(stepTask.IsCompleted); + + var checkpoints = recorder.Flushed; + Assert.Single(checkpoints); + Assert.Equal("RETRY", checkpoints[0].Action); + } + + #endregion } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs index a967c1ce2..2e3841c89 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs @@ -198,30 +198,37 @@ public async Task WrapAsync_CheckpointsAreSentToService() mockClient); Assert.Equal(InvocationStatus.Pending, output.Status); - Assert.Equal(2, mockClient.CheckpointCalls.Count); - - // First flush: step SUCCEED (the user awaits StepAsync, which awaits - // its SUCCEED enqueue, which blocks until the batcher flushes it). - var firstCall = mockClient.CheckpointCalls[0]; - Assert.Equal("arn:aws:lambda:us-east-1:123:durable-execution:checkpoint-test", firstCall.DurableExecutionArn); - Assert.Equal("initial-token", firstCall.CheckpointToken); - Assert.Single(firstCall.Updates); - var stepUpdate = firstCall.Updates[0]; - Assert.Equal("STEP", stepUpdate.Type); - Assert.Equal("SUCCEED", stepUpdate.Action); - Assert.Equal("validate", stepUpdate.Name); - Assert.NotNull(stepUpdate.Payload); - - // Second flush: wait START (blocks until the service has the timer - // recorded before WaitAsync suspends). - var secondCall = mockClient.CheckpointCalls[1]; - Assert.Single(secondCall.Updates); - var waitUpdate = secondCall.Updates[0]; - Assert.Equal("WAIT", waitUpdate.Type); - Assert.Equal("START", waitUpdate.Action); - Assert.Equal("delay", waitUpdate.Name); - Assert.NotNull(waitUpdate.WaitOptions); - Assert.Equal(30, waitUpdate.WaitOptions.WaitSeconds); + + // Each StepAsync emits a fire-and-forget START before user code runs + // (telemetry under AtLeastOncePerRetry). With FlushInterval = 0 the + // worker may flush the START on its own before SUCCEED arrives, so the + // exact batching of START vs SUCCEED is timing-dependent. Assert on + // the flat sequence of updates instead. + var allUpdates = mockClient.CheckpointCalls + .SelectMany(c => c.Updates) + .ToList(); + + // Expect: step START, step SUCCEED, wait START (in that order). + Assert.Equal(3, allUpdates.Count); + + Assert.Equal("STEP", allUpdates[0].Type); + Assert.Equal("START", allUpdates[0].Action); + Assert.Equal("validate", allUpdates[0].Name); + + Assert.Equal("STEP", allUpdates[1].Type); + Assert.Equal("SUCCEED", allUpdates[1].Action); + Assert.Equal("validate", allUpdates[1].Name); + Assert.NotNull(allUpdates[1].Payload); + + Assert.Equal("WAIT", allUpdates[2].Type); + Assert.Equal("START", allUpdates[2].Action); + Assert.Equal("delay", allUpdates[2].Name); + Assert.NotNull(allUpdates[2].WaitOptions); + Assert.Equal(30, allUpdates[2].WaitOptions.WaitSeconds); + + // The first call sends the initial checkpoint token. + Assert.Equal("arn:aws:lambda:us-east-1:123:durable-execution:checkpoint-test", mockClient.CheckpointCalls[0].DurableExecutionArn); + Assert.Equal("initial-token", mockClient.CheckpointCalls[0].CheckpointToken); } [Fact] @@ -502,9 +509,9 @@ public async Task WrapAsync_CheckpointThrowsTransient_PropagatesToHost(AmazonSer public async Task WrapAsync_HydrationThrows_AlwaysPropagatesToHost() { // State hydration is OUTSIDE the IsTerminalCheckpointError try/catch — every - // GetExecutionStateAsync failure escapes for Lambda retry, matching Python's - // GetExecutionStateError (an InvocationError). Use a 4xx that *would* be terminal - // if it came from a checkpoint flush to prove the path isn't classified. + // GetExecutionStateAsync failure escapes for Lambda retry. Use a 4xx that + // *would* be terminal if it came from a checkpoint flush to prove the path + // isn't classified. var input = new DurableExecutionInvocationInput { DurableExecutionArn = "arn:aws:lambda:us-east-1:123:durable-execution:hydrate-fail", diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs index cacc68a62..ea4fe0b03 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs @@ -49,8 +49,7 @@ public void LoadFromCheckpoint_OnlyExecutionInputOp_NotReplaying() { // The service sends one EXECUTION-type op carrying the input payload // even on the first invocation. That op is bookkeeping, not user - // history — it must not put us into replay mode. (Matches Python - // execution.py:258, Java ExecutionManager:81, JS execution-context.ts:62.) + // history — it must not put us into replay mode. var state = new ExecutionState(); state.LoadFromCheckpoint(new InitialExecutionState { @@ -105,8 +104,8 @@ public void TrackReplay_PendingOpDoesNotBlockTransition() { // A PENDING op (e.g. retry timer waiting) is not "completed" in the // checkpoint sense — once the workflow has visited every terminally- - // completed op the SDK treats subsequent code as fresh. Matches Python's - // {SUCCEEDED, FAILED, CANCELLED, STOPPED, TIMED_OUT} terminal set. + // completed op the SDK treats subsequent code as fresh. Terminal set + // is {SUCCEEDED, FAILED, CANCELLED, STOPPED}. var state = new ExecutionState(); state.LoadFromCheckpoint(new InitialExecutionState { @@ -199,8 +198,8 @@ public void HasOperation_ReturnsTrueForExisting() public void GetOperation_ReturnsLatestRecord_WhenIdAppearsMultipleTimes() { // Wire format: when the service replays an envelope it includes the - // most recent record per ID. Java/Python/JS reference SDKs all key by - // ID alone and rely on the service to provide the authoritative record. + // most recent record per ID. We key by ID alone and rely on the service + // to provide the authoritative record. var state = new ExecutionState(); state.LoadFromCheckpoint(new InitialExecutionState { diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs new file mode 100644 index 000000000..e5a277fb6 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs @@ -0,0 +1,202 @@ +using Amazon.Lambda.DurableExecution; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +public class RetryStrategyTests +{ + [Fact] + public void ExponentialDefault_RetriesUpToMaxAttempts() + { + var strategy = RetryStrategy.Default; + + // Attempts 1-5 should retry (maxAttempts=6 means 6 total attempts) + for (int i = 1; i < 6; i++) + { + var decision = strategy.ShouldRetry(new InvalidOperationException("fail"), i); + Assert.True(decision.ShouldRetry); + Assert.True(decision.Delay >= TimeSpan.FromSeconds(1)); + } + + // Attempt 6 should not retry (exhausted) + var lastDecision = strategy.ShouldRetry(new InvalidOperationException("fail"), 6); + Assert.False(lastDecision.ShouldRetry); + } + + [Fact] + public void None_NeverRetries() + { + var strategy = RetryStrategy.None; + + var decision = strategy.ShouldRetry(new Exception("fail"), 1); + Assert.False(decision.ShouldRetry); + } + + [Fact] + public void Transient_RetriesUpTo3Attempts() + { + var strategy = RetryStrategy.Transient; + + Assert.True(strategy.ShouldRetry(new Exception("fail"), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new Exception("fail"), 2).ShouldRetry); + Assert.False(strategy.ShouldRetry(new Exception("fail"), 3).ShouldRetry); + } + + [Fact] + public void Exponential_DelayIncreases() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 5, + initialDelay: TimeSpan.FromSeconds(2), + maxDelay: TimeSpan.FromSeconds(120), + backoffRate: 2.0, + jitter: JitterStrategy.None); + + var d1 = strategy.ShouldRetry(new Exception(), 1).Delay; + var d2 = strategy.ShouldRetry(new Exception(), 2).Delay; + var d3 = strategy.ShouldRetry(new Exception(), 3).Delay; + + // With no jitter: 2s, 4s, 8s (ceiling to whole seconds) + Assert.Equal(TimeSpan.FromSeconds(2), d1); + Assert.Equal(TimeSpan.FromSeconds(4), d2); + Assert.Equal(TimeSpan.FromSeconds(8), d3); + } + + [Fact] + public void Exponential_DelayCapsAtMax() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 10, + initialDelay: TimeSpan.FromSeconds(10), + maxDelay: TimeSpan.FromSeconds(30), + backoffRate: 3.0, + jitter: JitterStrategy.None); + + // Attempt 3: 10 * 3^2 = 90, capped to 30 + var decision = strategy.ShouldRetry(new Exception(), 3); + Assert.Equal(TimeSpan.FromSeconds(30), decision.Delay); + } + + [Fact] + public void Exponential_FullJitter_BoundedByDelay() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 5, + initialDelay: TimeSpan.FromSeconds(10), + maxDelay: TimeSpan.FromSeconds(100), + backoffRate: 2.0, + jitter: JitterStrategy.Full); + + // Run multiple times to check bounds + for (int i = 0; i < 50; i++) + { + var decision = strategy.ShouldRetry(new Exception(), 1); + Assert.True(decision.Delay >= TimeSpan.FromSeconds(1)); + Assert.True(decision.Delay <= TimeSpan.FromSeconds(10)); + } + } + + [Fact] + public void Exponential_HalfJitter_BoundedBetween50And100Percent() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 5, + initialDelay: TimeSpan.FromSeconds(10), + maxDelay: TimeSpan.FromSeconds(100), + backoffRate: 2.0, + jitter: JitterStrategy.Half); + + for (int i = 0; i < 50; i++) + { + var decision = strategy.ShouldRetry(new Exception(), 1); + Assert.True(decision.Delay >= TimeSpan.FromSeconds(5)); + Assert.True(decision.Delay <= TimeSpan.FromSeconds(10)); + } + } + + [Fact] + public void Exponential_RetryableExceptions_FiltersCorrectly() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableExceptions: new[] { typeof(TimeoutException), typeof(HttpRequestException) }); + + Assert.True(strategy.ShouldRetry(new TimeoutException(), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new HttpRequestException(), 1).ShouldRetry); + Assert.False(strategy.ShouldRetry(new InvalidOperationException(), 1).ShouldRetry); + } + + [Fact] + public void Exponential_RetryableExceptions_MatchesDerivedTypes() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableExceptions: new[] { typeof(IOException) }); + + Assert.True(strategy.ShouldRetry(new FileNotFoundException(), 1).ShouldRetry); + } + + [Fact] + public void Exponential_MessagePatterns_FiltersCorrectly() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableMessagePatterns: new[] { "timeout", "throttl", "5\\d{2}" }); + + Assert.True(strategy.ShouldRetry(new Exception("connection timeout"), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new Exception("request throttled"), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new Exception("HTTP 503"), 1).ShouldRetry); + Assert.False(strategy.ShouldRetry(new Exception("not found"), 1).ShouldRetry); + } + + [Fact] + public void Exponential_BothFilters_EitherMatches() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableExceptions: new[] { typeof(TimeoutException) }, + retryableMessagePatterns: new[] { "throttl" }); + + // Matches exception type + Assert.True(strategy.ShouldRetry(new TimeoutException("any message"), 1).ShouldRetry); + // Matches message pattern + Assert.True(strategy.ShouldRetry(new Exception("throttled"), 1).ShouldRetry); + // Matches neither + Assert.False(strategy.ShouldRetry(new InvalidOperationException("bad state"), 1).ShouldRetry); + } + + [Fact] + public void Exponential_NoFilters_RetriesAllExceptions() + { + var strategy = RetryStrategy.Exponential(maxAttempts: 3); + + Assert.True(strategy.ShouldRetry(new Exception("anything"), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new InvalidOperationException(), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new OutOfMemoryException(), 1).ShouldRetry); + } + + [Fact] + public void Exponential_MinimumDelayIsOneSecond() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromMilliseconds(100), + jitter: JitterStrategy.None); + + var decision = strategy.ShouldRetry(new Exception(), 1); + Assert.True(decision.Delay >= TimeSpan.FromSeconds(1)); + } + + [Fact] + public void FromDelegate_UsesProvidedFunction() + { + var strategy = RetryStrategy.FromDelegate((ex, attempt) => + attempt < 2 && ex is TimeoutException + ? RetryDecision.RetryAfter(TimeSpan.FromSeconds(5)) + : RetryDecision.DoNotRetry()); + + Assert.True(strategy.ShouldRetry(new TimeoutException(), 1).ShouldRetry); + Assert.False(strategy.ShouldRetry(new TimeoutException(), 2).ShouldRetry); + Assert.False(strategy.ShouldRetry(new Exception(), 1).ShouldRetry); + } +} From 9c3c3b942a459355eddcd783565e7f32de19cfee Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Thu, 21 May 2026 14:20:18 -0400 Subject: [PATCH 2/2] phil comment --- .../Internal/StepOperation.cs | 10 +++++++--- .../Amazon.Lambda.DurableExecution/RetryStrategy.cs | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs index ffdaaffd7..b764e07c2 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs @@ -109,9 +109,13 @@ private Task ReplayReady(Operation ready, CancellationToken cancellationToken } /// - /// PENDING means a retry was scheduled (RETRY checkpoint). If - /// NextAttemptTimestamp is in the future, re-suspend; otherwise the timer - /// has fired and we run the next attempt. + /// PENDING means a retry was scheduled (RETRY checkpoint). The service's + /// transition to READY when the timer fires is the authoritative "timer + /// fired" signal; we still get re-invoked in PENDING only if the service + /// re-invokes slightly early. The wall-clock check below is a safety net + /// for that case — clock skew can't cause a missed retry because if our + /// clock is fast we just run early, and if it's slow we re-suspend and + /// the service's READY transition takes over. /// private Task ReplayPending(Operation pending, CancellationToken cancellationToken) { diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs index 9cb6edbde..7ac331d69 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs @@ -133,7 +133,7 @@ public ExponentialRetryStrategy( _jitter = jitter; _retryableExceptions = retryableExceptions; _retryableMessagePatterns = retryableMessagePatterns? - .Select(p => new Regex(p, RegexOptions.Compiled)) + .Select(p => new Regex(p)) .ToArray(); }