diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs
index 0f724b4a2..bb95c8476 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs
@@ -47,3 +47,23 @@ public StepException(string message) : base(message) { }
/// Creates a wrapping an inner exception.
public StepException(string message, Exception innerException) : base(message, innerException) { }
}
+
+///
+/// Thrown when a step under is
+/// detected to have been interrupted mid-execution on a prior invocation
+/// (replay sees a STARTED checkpoint with no terminal record).
+///
+///
+/// Surfaces in so user-supplied
+/// strategies can distinguish "my code threw" from "a previous attempt
+/// crashed before it could record a result".
+///
+public class StepInterruptedException : StepException
+{
+ /// Creates an empty .
+ public StepInterruptedException() { }
+ /// Creates a with the given message.
+ public StepInterruptedException(string message) : base(message) { }
+ /// Creates a wrapping an inner exception.
+ public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs
index 85ee73040..e0d03747c 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs
@@ -155,8 +155,7 @@ private static async Task WrapAsyncCoreGetExecutionStateAsync) propagate as
/// too, but they are NOT caught here — they
- /// flow up to the host so Lambda retries, matching Python's GetExecutionStateError
- /// (which extends InvocationError).
+ /// flow up to the host so Lambda retries.
///
/// User-code SDK errors (e.g. an SDK call inside a Step body) are caught by
/// StepRunner and surfaced as StepException for the workflow's normal
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IRetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IRetryStrategy.cs
new file mode 100644
index 000000000..f291bed1e
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/IRetryStrategy.cs
@@ -0,0 +1,39 @@
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// Determines whether a failed step should be retried and with what delay.
+///
+public interface IRetryStrategy
+{
+ ///
+ /// Evaluates whether the given exception warrants a retry.
+ ///
+ /// The exception that caused the step to fail.
+ /// The 1-based attempt number that just failed.
+ /// A decision indicating whether to retry and the delay before the next attempt.
+ RetryDecision ShouldRetry(Exception exception, int attemptNumber);
+}
+
+///
+/// The outcome of a retry evaluation.
+///
+public readonly struct RetryDecision
+{
+ /// Whether the step should be retried.
+ public bool ShouldRetry { get; }
+
+ /// The delay before the next retry attempt.
+ public TimeSpan Delay { get; }
+
+ private RetryDecision(bool shouldRetry, TimeSpan delay)
+ {
+ ShouldRetry = shouldRetry;
+ Delay = delay;
+ }
+
+ /// Indicates the step should not be retried.
+ public static RetryDecision DoNotRetry() => new(false, TimeSpan.Zero);
+
+ /// Indicates the step should be retried after the specified delay.
+ public static RetryDecision RetryAfter(TimeSpan delay) => new(true, delay);
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs
index 8039e7c56..e098606f0 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs
@@ -11,13 +11,12 @@ namespace Amazon.Lambda.DurableExecution.Internal;
/// call awaits the flush of its containing batch (sync semantics).
///
///
-/// TODO: when Map / Parallel / ChildContext / WaitForCondition land — or when
-/// AtLeastOncePerRetry step START gets a non-blocking variant — they will need
-/// a fire-and-forget overload like
-/// Task EnqueueAsync(SdkOperationUpdate update, bool sync) where
-/// sync=false returns as soon as the item is queued. Java's
-/// sendOperationUpdate vs sendOperationUpdateAsync is the model.
-/// Today every call site is sync, so the API stays minimal.
+/// Fire-and-forget semantics are achieved by simply not awaiting the returned
+/// Task. Errors still surface deterministically via _terminalError: the
+/// next sync or rethrows.
+/// Callers using fire-and-forget should observe the discarded Task's exception
+/// (see StepOperation.FireAndForget) so it doesn't trip the runtime's
+/// UnobservedTaskException event.
///
internal sealed class CheckpointBatcher : IAsyncDisposable
{
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs
index a5e60b98e..2b7c0b2df 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs
@@ -24,12 +24,10 @@ internal sealed class CheckpointBatcherConfig
///
/// TODO: not enforced today. The worker only checks ;
/// a single oversized item (or a batch whose serialized size exceeds 750 KB)
- /// will be sent to the service and rejected there. Java/JS/Python all
- /// pre-flight this on the in-flight batch and split before the next add.
- /// Wire this in alongside the async-flush operations (Map / Parallel /
- /// child-context) since those are the scenarios that can actually fill a
- /// batch — today every batch is 1 item with
- /// = Zero, so the gap is latent.
+ /// will be sent to the service and rejected there. Wire this in alongside
+ /// the async-flush operations (Map / Parallel / child-context) since those
+ /// are the scenarios that can actually fill a batch — today every batch is
+ /// 1 item with = Zero, so the gap is latent.
///
internal int MaxBatchBytes { get; init; } = 750 * 1024;
}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs
index f936d3d24..ef27691a3 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs
@@ -6,7 +6,6 @@ namespace Amazon.Lambda.DurableExecution.Internal;
/// ; this type is the inbound side only.
///
///
-/// Replay tracking mirrors the Python / Java / JavaScript reference SDKs:
///
/// - At construction the workflow is "replaying" if and only if any user-replayable
/// op is present. The service always sends one EXECUTION-type op
@@ -41,11 +40,15 @@ public void LoadFromCheckpoint(InitialExecutionState? initialState)
AddOperations(initialState.Operations);
}
- // Only user-replayable ops put us into replay mode. The service-side
- // EXECUTION op (input payload bookkeeping) is always present and must
- // not count — see Python execution.py:258 / Java ExecutionManager:81 /
- // JS execution-context.ts:62 for the same rule.
- (_isReplaying, _remainingReplayOps) = ScanReplayable();
+ // We're "replaying" when there are completed ops (SUCCEEDED, FAILED,
+ // CANCELLED, STOPPED) we need to re-derive before resuming live work.
+ // The service-side EXECUTION op (input payload bookkeeping) is always
+ // present and doesn't count. If the only ops are in-progress
+ // (READY/PENDING/STARTED), there's nothing to re-derive — the next
+ // user call IS the next thing to run — so IsReplaying starts false.
+ var (_, terminalCount) = ScanReplayable();
+ _remainingReplayOps = terminalCount;
+ _isReplaying = terminalCount > 0;
}
public void AddOperations(IEnumerable operations)
@@ -91,8 +94,11 @@ public void TrackReplay(string operationId)
public void ValidateReplayConsistency(string operationId, string expectedType, string? expectedName)
{
- if (!_isReplaying) return;
-
+ // Independent of IsReplaying: as long as a checkpoint record exists
+ // for this id, its type/name must match what user code is asking for.
+ // If the only checkpointed ops are in-progress (PENDING/READY/STARTED),
+ // IsReplaying is false but the records still exist and code drift can
+ // still produce a mismatch.
if (!_operations.TryGetValue(operationId, out var op)) return;
if (op.Type != null && op.Type != expectedType)
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/OperationIdGenerator.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/OperationIdGenerator.cs
index 4e9527d3c..4dd03d328 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/OperationIdGenerator.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/OperationIdGenerator.cs
@@ -8,11 +8,10 @@ namespace Amazon.Lambda.DurableExecution.Internal;
///
/// Generates deterministic operation IDs for durable operations. Each call
/// increments an internal counter and SHA-256 hashes "<parentId>-<counter>"
-/// (or just "<counter>" at the root). Hashing matches the wire format
-/// used by the Java/JS/Python SDKs so the same workflow position produces a
-/// stable, opaque ID across replays — and the human-readable step name is
-/// carried separately on OperationUpdate.Name, so renaming a step does
-/// not break replay correlation.
+/// (or just "<counter>" at the root). The same workflow position
+/// produces a stable, opaque ID across replays — and the human-readable step
+/// name is carried separately on OperationUpdate.Name, so renaming a
+/// step does not break replay correlation.
///
internal sealed class OperationIdGenerator
{
@@ -46,7 +45,7 @@ public OperationIdGenerator(string? parentId)
///
/// Generates the next operation ID. The counter is pre-incremented so the
- /// first ID is hash("1"), matching the reference SDKs.
+ /// first ID is hash("1").
///
///
/// Uses so concurrent callers
@@ -55,7 +54,7 @@ public OperationIdGenerator(string? parentId)
/// MapAsync branches that fan out before awaiting) cannot collide
/// on the same ID. Determinism still requires that calls happen in a
/// deterministic order — atomicity prevents duplicate IDs but not
- /// reordering between replays. Matches Java's AtomicInteger.incrementAndGet.
+ /// reordering between replays.
///
public string NextId()
{
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs
index 42c9e3461..b764e07c2 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs
@@ -5,20 +5,27 @@
using Microsoft.Extensions.Logging;
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
+using SdkStepOptions = Amazon.Lambda.Model.StepOptions;
namespace Amazon.Lambda.DurableExecution.Internal;
///
-/// Durable step operation. Runs the user's function once across the lifetime
-/// of a durable execution, persisting its result so subsequent invocations
-/// replay the cached value without re-executing.
+/// Durable step operation. Runs the user's function (with retry support),
+/// persisting its result so subsequent invocations replay the cached value
+/// without re-executing.
///
///
-/// Replay semantics — example: await ctx.StepAsync(ChargeCard, "charge")
+/// Replay branches — example: await ctx.StepAsync(ChargeCard, "charge")
///
-/// - Fresh: no prior state → run func → emit SUCCEED → return result.
-/// - Replay (SUCCEEDED): return cached result; func is NOT re-executed.
-/// - Replay (FAILED): re-throw the recorded exception.
+/// - Fresh: no prior state → run func → emit SUCCEED → return.
+/// - SUCCEEDED: return cached result; func is NOT re-executed.
+/// - FAILED: re-throw the recorded exception.
+/// - PENDING (retry timer not yet fired): re-suspend without
+/// running func; service re-invokes once NextAttemptTimestamp elapses.
+/// - STARTED + AtMostOncePerRetry: crash recovery — treat as a
+/// failed attempt, route through retry strategy.
+/// - READY: service has post-PENDING re-invoked us; the retry
+/// timer fired and the next attempt is up. Run it.
///
/// Serialization is delegated to the registered on
/// . AOT-safe and reflection-based callers
@@ -55,7 +62,7 @@ public StepOperation(
protected override string OperationType => OperationTypes.Step;
protected override Task StartAsync(CancellationToken cancellationToken)
- => ExecuteFunc(cancellationToken);
+ => ExecuteFunc(attemptNumber: 1, cancellationToken);
protected override Task ReplayAsync(Operation existing, CancellationToken cancellationToken)
{
@@ -71,31 +78,130 @@ protected override Task ReplayAsync(Operation existing, CancellationToken can
// user's catch-block flow matches the original execution.
throw CreateStepException(existing);
+ case OperationStatuses.Pending:
+ return ReplayPending(existing, cancellationToken);
+
+ case OperationStatuses.Started:
+ return ReplayStarted(existing, cancellationToken);
+
+ case OperationStatuses.Ready:
+ return ReplayReady(existing, cancellationToken);
+
default:
- // STARTED/READY/PENDING from a prior invocation — no retry logic
- // in this commit, so fall through and execute fresh. (Future work
- // on retries will replace this default with explicit arms.)
- return ExecuteFunc(cancellationToken);
+ // CANCELLED / STOPPED / unrecognized status. Re-running the
+ // step would re-execute side effects and silently mask a
+ // service-state we don't know how to interpret. Fail loud.
+ throw new NonDeterministicExecutionException(
+ $"Step operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
}
}
- private async Task ExecuteFunc(CancellationToken cancellationToken)
+ ///
+ /// READY means the service has post-PENDING re-invoked us — the retry
+ /// timer fired and the step is eligible to run its next attempt. No
+ /// timer check is needed (the service has already decided we're up);
+ /// just advance the attempt counter and execute.
+ ///
+ private Task ReplayReady(Operation ready, CancellationToken cancellationToken)
+ {
+ var attemptNumber = (ready.StepDetails?.Attempt ?? 0) + 1;
+ return ExecuteFunc(attemptNumber, cancellationToken);
+ }
+
+ ///
+ /// PENDING means a retry was scheduled (RETRY checkpoint). The service's
+ /// transition to READY when the timer fires is the authoritative "timer
+ /// fired" signal; we still get re-invoked in PENDING only if the service
+ /// re-invokes slightly early. The wall-clock check below is a safety net
+ /// for that case — clock skew can't cause a missed retry because if our
+ /// clock is fast we just run early, and if it's slow we re-suspend and
+ /// the service's READY transition takes over.
+ ///
+ private Task ReplayPending(Operation pending, CancellationToken cancellationToken)
+ {
+ var nextAttemptTs = pending.StepDetails?.NextAttemptTimestamp;
+ var attemptNumber = (pending.StepDetails?.Attempt ?? 0) + 1;
+
+ if (nextAttemptTs is { } scheduledMs &&
+ DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() < scheduledMs)
+ {
+ // Retry timer hasn't fired yet — re-suspend so we don't bill compute
+ // while the timer ticks. Service re-invokes once the timer elapses.
+ return Termination.SuspendAndAwait(
+ TerminationReason.RetryScheduled, $"retry:{Name ?? OperationId}");
+ }
+
+ return ExecuteFunc(attemptNumber, cancellationToken);
+ }
+
+ ///
+ /// STARTED means a START checkpoint was written but no SUCCEED/FAIL exists.
+ /// For AtMostOncePerRetry this signals a crash mid-step — treat as failure
+ /// and route through retry. For AtLeastOncePerRetry just re-execute.
+ ///
+ private Task ReplayStarted(Operation started, CancellationToken cancellationToken)
+ {
+ var attemptNumber = (started.StepDetails?.Attempt ?? 0) + 1;
+
+ if (_config?.Semantics == StepSemantics.AtMostOncePerRetry)
+ {
+ // Re-running func would risk a duplicate side effect (e.g. double
+ // charge). Treat the lost result as a failure; let the retry
+ // strategy decide whether to try again or give up.
+ //
+ // Surface as StepInterruptedException so user strategies can
+ // distinguish "my code threw" from "a prior attempt crashed before
+ // recording a terminal record".
+ var error = started.StepDetails?.Error;
+ var ex = error != null
+ ? new StepInterruptedException(error.ErrorMessage ?? "Step failed on previous attempt") { ErrorType = error.ErrorType }
+ : new StepInterruptedException("Step result lost during AtMostOncePerRetry replay");
+ return HandleStepFailureAsync(ex, attemptNumber, cancellationToken);
+ }
+
+ return ExecuteFunc(attemptNumber, cancellationToken);
+ }
+
+ private async Task ExecuteFunc(int attemptNumber, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
- // TODO: emit a STEP_STARTED checkpoint (action = "START") here when retries
- // and/or AtMostOncePerRetry semantics land. AtMostOncePerRetry needs the
- // START to be sync-flushed before user code runs (so replay can detect
- // "we already attempted this and must not re-run"). AtLeastOncePerRetry
- // wants it fire-and-forget for telemetry (attempt timing, retry count in
- // history). Both require the async-flush overload in CheckpointBatcher
- // (see TODO in CheckpointBatcher.cs). Today neither feature is wired up,
- // so the START is intentionally omitted — SUCCEED alone is sufficient
- // for replay correctness in the AtLeastOncePerRetry-only world this PR
- // ships. Java SDK precedent: StepOperation.checkpointStarted().
+ // Emit a START checkpoint before running user code, unless we're already
+ // resuming a STARTED record (which means an earlier attempt wrote it).
+ //
+ // AtMostOncePerRetry: SYNC flush. If Lambda crashes before SUCCEED is
+ // flushed, ReplayStarted routes through retry instead of re-executing.
+ // A queued-but-unflushed START is indistinguishable from "never ran" if
+ // we die, so the sync flush is correctness-load-bearing here.
+ //
+ // AtLeastOncePerRetry (default): FIRE-AND-FORGET. Replay correctness
+ // doesn't depend on the START — SUCCEED alone is sufficient — so this
+ // is purely telemetry (attempt timing, retry count visible in history).
+ if (State.GetOperation(OperationId)?.Status != OperationStatuses.Started)
+ {
+ var startUpdate = new SdkOperationUpdate
+ {
+ Id = OperationId,
+ Type = OperationTypes.Step,
+ Action = OperationAction.START,
+ SubType = OperationSubTypes.Step,
+ Name = Name
+ };
+
+ if (_config?.Semantics == StepSemantics.AtMostOncePerRetry)
+ {
+ await EnqueueAsync(startUpdate, cancellationToken);
+ }
+ else
+ {
+ FireAndForget(EnqueueAsync(startUpdate, cancellationToken));
+ }
+ }
+
+
try
{
- var stepContext = new StepContext(OperationId, attemptNumber: 1, _logger);
+ var stepContext = new StepContext(OperationId, attemptNumber, _logger);
var result = await _func(stepContext);
await EnqueueAsync(new SdkOperationUpdate
@@ -103,7 +209,7 @@ await EnqueueAsync(new SdkOperationUpdate
Id = OperationId,
Type = OperationTypes.Step,
Action = OperationAction.SUCCEED,
- SubType = "Step",
+ SubType = OperationSubTypes.Step,
Name = Name,
Payload = SerializeResult(result)
}, cancellationToken);
@@ -116,24 +222,64 @@ await EnqueueAsync(new SdkOperationUpdate
}
catch (Exception ex)
{
- // No retry logic in this commit: any thrown exception becomes a
- // FAIL checkpoint and is re-thrown as a StepException. On replay,
- // the FAILED branch above will re-throw without re-executing.
- await EnqueueAsync(new SdkOperationUpdate
- {
- Id = OperationId,
- Type = OperationTypes.Step,
- Action = OperationAction.FAIL,
- SubType = "Step",
- Name = Name,
- Error = ToSdkError(ex)
- }, cancellationToken);
+ // Funnel into the retry/fail decision tree. May checkpoint RETRY and
+ // suspend (Pending), or checkpoint FAIL and rethrow to user.
+ return await HandleStepFailureAsync(ex, attemptNumber, cancellationToken);
+ }
+ }
- throw new StepException(ex.Message, ex)
+ ///
+ /// Funnels a step failure into the retry/fail decision. May checkpoint
+ /// RETRY and suspend (Pending), or checkpoint FAIL and rethrow.
+ ///
+ private async Task HandleStepFailureAsync(Exception ex, int attemptNumber, CancellationToken cancellationToken)
+ {
+ var retryStrategy = _config?.RetryStrategy;
+ if (retryStrategy != null)
+ {
+ var decision = retryStrategy.ShouldRetry(ex, attemptNumber);
+ if (decision.ShouldRetry)
{
- ErrorType = ex.GetType().FullName
- };
+ // Service requires NextAttemptDelaySeconds >= 1. Built-in
+ // strategies already produce >=1s delays; this guard only
+ // matters for user-supplied IRetryStrategy / FromDelegate.
+ var requestedSeconds = decision.Delay.TotalSeconds;
+ var delaySeconds = (int)Math.Max(1, Math.Ceiling(requestedSeconds));
+ if (requestedSeconds < 1)
+ {
+ _logger.LogWarning(
+ "Retry delay for step '{StepName}' attempt {Attempt} was {Requested:F3}s (< 1s); coerced to {Coerced}s.",
+ Name ?? OperationId, attemptNumber, requestedSeconds, delaySeconds);
+ }
+ await EnqueueAsync(new SdkOperationUpdate
+ {
+ Id = OperationId,
+ Type = OperationTypes.Step,
+ Action = OperationAction.RETRY,
+ SubType = OperationSubTypes.Step,
+ Name = Name,
+ Error = ToSdkError(ex),
+ StepOptions = new SdkStepOptions { NextAttemptDelaySeconds = delaySeconds }
+ }, cancellationToken);
+ return await Termination.SuspendAndAwait(
+ TerminationReason.RetryScheduled, $"retry:{Name ?? OperationId}");
+ }
}
+
+ await EnqueueAsync(new SdkOperationUpdate
+ {
+ Id = OperationId,
+ Type = OperationTypes.Step,
+ Action = OperationAction.FAIL,
+ SubType = OperationSubTypes.Step,
+ Name = Name,
+ Error = ToSdkError(ex)
+ }, cancellationToken);
+
+ throw new StepException(ex.Message, ex)
+ {
+ ErrorType = ex.GetType().FullName
+ };
}
private T DeserializeResult(string? serialized)
@@ -168,4 +314,20 @@ private static StepException CreateStepException(Operation failedOp)
ErrorMessage = ex.Message,
StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList()
};
+
+ ///
+ /// Discards a Task but observes any exception so it doesn't surface as an
+ /// UnobservedTaskException. Used for fire-and-forget START checkpoints
+ /// under AtLeastOncePerRetry semantics. The actual error still propagates
+ /// via CheckpointBatcher._terminalError: the next sync EnqueueAsync
+ /// or DrainAsync will rethrow with the original cause.
+ ///
+ private static void FireAndForget(Task task)
+ {
+ _ = task.ContinueWith(
+ static t => _ = t.Exception,
+ CancellationToken.None,
+ TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
+ TaskScheduler.Default);
+ }
}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs
index 1350c3d70..5d61e611b 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs
@@ -6,6 +6,7 @@ namespace Amazon.Lambda.DurableExecution.Internal;
internal enum TerminationReason
{
WaitScheduled,
+ RetryScheduled,
CallbackPending,
InvokePending,
CheckpointFailed
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs
index e8351c120..2c1325974 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs
@@ -49,7 +49,7 @@ await EnqueueAsync(new SdkOperationUpdate
Id = OperationId,
Type = OperationTypes.Wait,
Action = OperationAction.START,
- SubType = "Wait",
+ SubType = OperationSubTypes.Wait,
Name = Name,
WaitOptions = new SdkWaitOptions { WaitSeconds = _waitSeconds }
}, cancellationToken);
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs
index 7237eef87..95b6a35b8 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs
@@ -167,6 +167,30 @@ public static class OperationTypes
public const string Execution = "EXECUTION";
}
+///
+/// Wire-format string constants. SubType is a
+/// finer-grained classifier sent alongside for
+/// observability — the values are PascalCase ("Step", "Wait") and distinct
+/// from the uppercase values.
+///
+public static class OperationSubTypes
+{
+ /// Step sub-type.
+ public const string Step = "Step";
+
+ /// Wait sub-type.
+ public const string Wait = "Wait";
+
+ /// Callback sub-type.
+ public const string Callback = "Callback";
+
+ /// Chained-invoke sub-type.
+ public const string ChainedInvoke = "ChainedInvoke";
+
+ /// Child-context sub-type.
+ public const string Context = "Context";
+}
+
///
/// Wire-format string constants.
/// Plural name avoids collision with Amazon.Lambda.OperationStatus.
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs
new file mode 100644
index 000000000..7ac331d69
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/RetryStrategy.cs
@@ -0,0 +1,201 @@
+using System.Text.RegularExpressions;
+
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// Jitter strategy for exponential backoff to prevent thundering-herd scenarios.
+///
+public enum JitterStrategy
+{
+ /// No randomization — delay is exactly the calculated backoff value.
+ None,
+ /// Random delay between 0 and the calculated backoff value (recommended).
+ Full,
+ /// Random delay between 50% and 100% of the calculated backoff value.
+ Half
+}
+
+///
+/// Controls whether a step re-executes if the Lambda is re-invoked mid-attempt.
+///
+public enum StepSemantics
+{
+ ///
+ /// Default. The step may re-execute if the Lambda is re-invoked during execution.
+ /// Use for idempotent operations.
+ ///
+ AtLeastOncePerRetry,
+
+ ///
+ /// The step executes at most once per retry attempt. A START checkpoint is written
+ /// before execution; on replay with an existing START, the SDK skips re-execution
+ /// and proceeds to the retry handler.
+ ///
+ AtMostOncePerRetry
+}
+
+///
+/// Factory methods for common retry strategies.
+///
+public static class RetryStrategy
+{
+ /// 6 attempts, 2x backoff, 5s initial delay, 60s max, Full jitter.
+ public static IRetryStrategy Default { get; } = Exponential(
+ maxAttempts: 6,
+ initialDelay: TimeSpan.FromSeconds(5),
+ maxDelay: TimeSpan.FromSeconds(60),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.Full);
+
+ /// 3 attempts, 2x backoff, 1s initial delay, 5s max, Half jitter.
+ public static IRetryStrategy Transient { get; } = Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromSeconds(1),
+ maxDelay: TimeSpan.FromSeconds(5),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.Half);
+
+ /// No retry — 1 attempt only.
+ public static IRetryStrategy None { get; } = Exponential(maxAttempts: 1);
+
+ ///
+ /// Creates an exponential backoff retry strategy.
+ ///
+ ///
+ /// Thrown if < 1, < 1,
+ /// is non-positive, is non-positive,
+ /// or > .
+ ///
+ public static IRetryStrategy Exponential(
+ int maxAttempts = 3,
+ TimeSpan? initialDelay = null,
+ TimeSpan? maxDelay = null,
+ double backoffRate = 2.0,
+ JitterStrategy jitter = JitterStrategy.Full,
+ Type[]? retryableExceptions = null,
+ string[]? retryableMessagePatterns = null)
+ {
+ return new ExponentialRetryStrategy(
+ maxAttempts,
+ initialDelay ?? TimeSpan.FromSeconds(5),
+ maxDelay ?? TimeSpan.FromSeconds(300),
+ backoffRate,
+ jitter,
+ retryableExceptions,
+ retryableMessagePatterns);
+ }
+
+ ///
+ /// Creates a retry strategy from a delegate.
+ ///
+ /// Thrown if is null.
+ public static IRetryStrategy FromDelegate(Func strategy)
+ {
+ if (strategy == null) throw new ArgumentNullException(nameof(strategy));
+ return new DelegateRetryStrategy(strategy);
+ }
+}
+
+internal sealed class ExponentialRetryStrategy : IRetryStrategy
+{
+ private readonly int _maxAttempts;
+ private readonly TimeSpan _initialDelay;
+ private readonly TimeSpan _maxDelay;
+ private readonly double _backoffRate;
+ private readonly JitterStrategy _jitter;
+ private readonly Type[]? _retryableExceptions;
+ private readonly Regex[]? _retryableMessagePatterns;
+
+ public ExponentialRetryStrategy(
+ int maxAttempts,
+ TimeSpan initialDelay,
+ TimeSpan maxDelay,
+ double backoffRate,
+ JitterStrategy jitter,
+ Type[]? retryableExceptions,
+ string[]? retryableMessagePatterns)
+ {
+ if (maxAttempts < 1)
+ throw new ArgumentOutOfRangeException(nameof(maxAttempts), maxAttempts, "must be >= 1");
+ if (initialDelay <= TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException(nameof(initialDelay), initialDelay, "must be > 0");
+ if (maxDelay <= TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException(nameof(maxDelay), maxDelay, "must be > 0");
+ if (initialDelay > maxDelay)
+ throw new ArgumentOutOfRangeException(nameof(initialDelay), initialDelay, $"must be <= maxDelay ({maxDelay})");
+ if (backoffRate < 1.0 || double.IsNaN(backoffRate) || double.IsInfinity(backoffRate))
+ throw new ArgumentOutOfRangeException(nameof(backoffRate), backoffRate, "must be a finite value >= 1.0");
+
+ _maxAttempts = maxAttempts;
+ _initialDelay = initialDelay;
+ _maxDelay = maxDelay;
+ _backoffRate = backoffRate;
+ _jitter = jitter;
+ _retryableExceptions = retryableExceptions;
+ _retryableMessagePatterns = retryableMessagePatterns?
+ .Select(p => new Regex(p))
+ .ToArray();
+ }
+
+ public RetryDecision ShouldRetry(Exception exception, int attemptNumber)
+ {
+ if (attemptNumber >= _maxAttempts)
+ return RetryDecision.DoNotRetry();
+
+ if (!IsRetryable(exception))
+ return RetryDecision.DoNotRetry();
+
+ var delay = CalculateDelay(attemptNumber);
+ return RetryDecision.RetryAfter(delay);
+ }
+
+ private bool IsRetryable(Exception exception)
+ {
+ if (_retryableExceptions == null && _retryableMessagePatterns == null)
+ return true;
+
+ if (_retryableExceptions != null)
+ {
+ var exType = exception.GetType();
+ if (_retryableExceptions.Any(t => t.IsAssignableFrom(exType)))
+ return true;
+ }
+
+ if (_retryableMessagePatterns != null)
+ {
+ var message = exception.Message;
+ if (_retryableMessagePatterns.Any(p => p.IsMatch(message)))
+ return true;
+ }
+
+ return false;
+ }
+
+ internal TimeSpan CalculateDelay(int attemptNumber)
+ {
+ var baseDelay = _initialDelay.TotalSeconds * Math.Pow(_backoffRate, attemptNumber - 1);
+ var cappedDelay = Math.Min(baseDelay, _maxDelay.TotalSeconds);
+
+ var finalDelay = _jitter switch
+ {
+ JitterStrategy.Full => Random.Shared.NextDouble() * cappedDelay,
+ JitterStrategy.Half => cappedDelay * (0.5 + 0.5 * Random.Shared.NextDouble()),
+ _ => cappedDelay
+ };
+
+ return TimeSpan.FromSeconds(Math.Max(1, Math.Ceiling(finalDelay)));
+ }
+}
+
+internal sealed class DelegateRetryStrategy : IRetryStrategy
+{
+ private readonly Func _strategy;
+
+ public DelegateRetryStrategy(Func strategy)
+ {
+ _strategy = strategy ?? throw new ArgumentNullException(nameof(strategy));
+ }
+
+ public RetryDecision ShouldRetry(Exception exception, int attemptNumber)
+ => _strategy(exception, attemptNumber);
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/StepConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/StepConfig.cs
index 2380967de..362867c09 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/StepConfig.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/StepConfig.cs
@@ -5,9 +5,14 @@ namespace Amazon.Lambda.DurableExecution;
///
public sealed class StepConfig
{
- // TODO: Retry support is deferred to a follow-up PR. When added, this is
- // where RetryStrategy and Semantics (AtLeastOncePerRetry / AtMostOncePerRetry)
- // will live. The follow-up needs to use service-mediated retries (checkpoint
- // a RETRY operation + suspend the Lambda) rather than an in-process Task.Delay
- // loop, to avoid billing Lambda compute time during retry backoff.
+ ///
+ /// Retry strategy for failed steps. When null (default), failures are not retried.
+ ///
+ public IRetryStrategy? RetryStrategy { get; set; }
+
+ ///
+ /// Controls whether a step may re-execute if the Lambda is re-invoked mid-attempt.
+ /// Default is .
+ ///
+ public StepSemantics Semantics { get; set; } = StepSemantics.AtLeastOncePerRetry;
}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/AtMostOnceCrashReplayTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/AtMostOnceCrashReplayTest.cs
new file mode 100644
index 000000000..7bae660f0
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/AtMostOnceCrashReplayTest.cs
@@ -0,0 +1,81 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class AtMostOnceCrashReplayTest
+{
+ private readonly ITestOutputHelper _output;
+ public AtMostOnceCrashReplayTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// Validates the AtMostOncePerRetry crash-recovery wire path: the Lambda
+ /// process is killed mid-step on attempt 1 (after START flush, before
+ /// SUCCEED). On re-invocation the SDK sees a STARTED checkpoint with no
+ /// terminal record and routes through the retry strategy rather than
+ /// re-executing the step. Attempt 2 succeeds.
+ ///
+ /// This is the only path that exercises the StepInterruptedException
+ /// synthesis — the unit-test analogue
+ /// (StepAsync_AtMostOnce_StartedReplay_TriggersRetryHandler) fakes the
+ /// STARTED state in-memory and never proves the service actually delivers
+ /// it on a real crash.
+ ///
+ [Fact]
+ public async Task AtMostOnce_StepCrashesMidExecution_RecoversViaRetry()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("AtMostOnceCrashFunction"),
+ "amocrash", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ // 2s retry delay + initial-attempt cold-start + recovery invoke. Generous headroom.
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Any(e => e.StepSucceededDetails != null && e.Name == "crash_then_recover") ?? false),
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ // Attempt 1 was crashed (no SUCCEED), attempt 2 recovered.
+ // We expect exactly one StepSucceeded carrying "recovered on attempt 2".
+ var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "crash_then_recover");
+ Assert.NotNull(succeeded);
+ Assert.Equal("\"recovered on attempt 2\"", succeeded!.StepSucceededDetails.Result?.Payload);
+
+ // Two StepStarted events: one per invocation.
+ Assert.True(
+ events.Count(e => e.EventType == EventType.StepStarted) >= 2,
+ "Expected at least 2 StepStarted events (attempt 1 crashed, attempt 2 recovered).");
+
+ // The crash-recovery branch records the synthesized StepInterruptedException
+ // as a StepFailed event for attempt 1, with a message identifying the lost
+ // attempt rather than a user exception type.
+ var failures = events
+ .Where(e => e.StepFailedDetails != null && e.Name == "crash_then_recover")
+ .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty)
+ .ToList();
+ Assert.NotEmpty(failures);
+ Assert.Contains(failures, m => m.Contains("Step result lost", StringComparison.OrdinalIgnoreCase)
+ || m.Contains("interrupted", StringComparison.OrdinalIgnoreCase)
+ || m.Contains("previous attempt", StringComparison.OrdinalIgnoreCase));
+
+ // The execution actually crossed at least one invocation boundary
+ // (otherwise replay wasn't exercised at all).
+ var invocations = events.Where(e => e.InvocationCompletedDetails != null).ToList();
+ Assert.True(
+ invocations.Count >= 2,
+ $"Expected at least 2 InvocationCompleted events (proves crash + replay), got {invocations.Count}");
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs
index 8b5bb2e1b..b2ba4bb1a 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs
@@ -263,8 +263,16 @@ public async Task WaitForHistoryAsync(
{
last = await GetHistoryAsync(durableExecutionArn, includeExecutionData);
var eventCount = last.Events?.Count ?? 0;
- _output.WriteLine($"[WaitForHistory] attempt {attempt}: {eventCount} events");
- if (predicate(last)) return last;
+ var typeCounts = last.Events?
+ .GroupBy(e => e.EventType?.Value ?? "")
+ .Select(g => $"{g.Key}:{g.Count()}")
+ .OrderBy(s => s);
+ _output.WriteLine($"[WaitForHistory] attempt {attempt}: {eventCount} events [{string.Join(",", typeCounts ?? Enumerable.Empty())}]");
+ if (predicate(last))
+ {
+ DumpEvents(last);
+ return last;
+ }
}
catch (Exception ex)
{
@@ -274,9 +282,21 @@ public async Task WaitForHistoryAsync(
}
_output.WriteLine($"[WaitForHistory] gave up after {attempt} attempts; returning last response with {last?.Events?.Count ?? 0} events");
+ if (last != null) DumpEvents(last);
return last ?? throw new TimeoutException($"GetDurableExecutionHistory never succeeded within {timeout.TotalSeconds}s");
}
+ private void DumpEvents(GetDurableExecutionHistoryResponse history)
+ {
+ var events = history.Events ?? new List();
+ _output.WriteLine($"[WaitForHistory] event dump ({events.Count} total):");
+ for (int i = 0; i < events.Count; i++)
+ {
+ var e = events[i];
+ _output.WriteLine($" [{i}] type={e.EventType?.Value ?? ""} name={e.Name ?? ""} ts={e.EventTimestamp:O}");
+ }
+ }
+
public string? ExtractDurableExecutionArn(string responsePayload)
{
try
@@ -375,14 +395,18 @@ await Task.WhenAny(
var stdout = await stdoutTask;
var stderr = await stderrTask;
- if (!string.IsNullOrWhiteSpace(stdout))
- _output.WriteLine($"stdout: {stdout[..Math.Min(stdout.Length, 1000)]}");
-
if (process.ExitCode != 0)
{
+ // Dump the FULL streams on failure — diagnosing build errors with
+ // truncated output is painful, and these only fire on test failure.
+ _output.WriteLine($"stdout: {stdout}");
_output.WriteLine($"stderr: {stderr}");
- throw new Exception($"{fileName} failed (exit {process.ExitCode}): {stderr}");
+ var detail = !string.IsNullOrWhiteSpace(stderr) ? stderr : stdout;
+ throw new Exception($"{fileName} failed (exit {process.ExitCode}): {detail}");
}
+
+ if (!string.IsNullOrWhiteSpace(stdout))
+ _output.WriteLine($"stdout: {stdout[..Math.Min(stdout.Length, 1000)]}");
}
public async ValueTask DisposeAsync()
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongRetryChainTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongRetryChainTest.cs
new file mode 100644
index 000000000..105689ecc
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongRetryChainTest.cs
@@ -0,0 +1,77 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class LongRetryChainTest
+{
+ private readonly ITestOutputHelper _output;
+ public LongRetryChainTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// Long retry chain across many invocations: step fails 5 times before
+ /// succeeding on attempt 6. Validates that StepDetails.Attempt increments
+ /// monotonically across invocations (no off-by-one, no skipped attempts)
+ /// and that IStepContext.AttemptNumber on the user side matches the wire
+ /// value on each attempt.
+ ///
+ [Fact]
+ public async Task FailsFiveTimesThenSucceeds_AttemptCounterIsMonotonic()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("LongRetryChainFunction"),
+ "longretry", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ // Total retry delay budget: 1+2+3+4+5 = 15s. Allow generous headroom.
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(180));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 6
+ && (h.Events?.Any(e => e.StepSucceededDetails != null) ?? false),
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ // Six attempts total: five failures + one success.
+ Assert.Equal(6, events.Count(e => e.EventType == EventType.StepStarted));
+ Assert.Equal(5, events.Count(e => e.StepFailedDetails != null && e.Name == "long_retry_step"));
+ var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "long_retry_step");
+ Assert.NotNull(succeeded);
+
+ // The user-facing AttemptNumber on the final (winning) attempt was 6 —
+ // proves IStepContext.AttemptNumber tracks the wire attempt counter
+ // across invocations, not just within a single invocation.
+ Assert.Equal("\"ok on attempt 6\"", succeeded!.StepSucceededDetails.Result?.Payload);
+
+ // Each failure carries a unique per-attempt message — confirms the user-side
+ // counter incremented exactly once per invocation, no duplicates or skips.
+ var failureMessages = events
+ .Where(e => e.StepFailedDetails != null && e.Name == "long_retry_step")
+ .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty)
+ .ToList();
+ Assert.Equal(5, failureMessages.Count);
+ for (int i = 1; i <= 5; i++)
+ {
+ Assert.Contains(failureMessages, m => m.Contains($"attempt {i}"));
+ }
+
+ // The chain was executed across multiple invocations (proves the
+ // service actually re-invoked us between retries instead of holding
+ // a single Lambda alive through all six attempts).
+ var invocations = events.Where(e => e.InvocationCompletedDetails != null).ToList();
+ Assert.True(
+ invocations.Count >= 5,
+ $"Expected at least 5 InvocationCompleted events (one per retry boundary), got {invocations.Count}");
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs
index 0592d0d44..bfc2913ed 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs
@@ -30,11 +30,14 @@ public async Task LongerWait_ExpiresAndCompletes()
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2
+ && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2
&& (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false),
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted));
+
// Steps before and after the wait both ran, with the post-wait step seeing
// the pre-wait step's value via replay.
var stepResults = events
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs
index 573ecc082..6b0ae0bc7 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs
@@ -32,10 +32,13 @@ public async Task MultipleSteps_AllCheckpointed()
// all events are indexed. Wait until we see all 5 step-succeeded events.
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 5,
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 5
+ && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 5,
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(5, events.Count(e => e.EventType == EventType.StepStarted));
+
// Each step ran exactly once (no replay-induced duplicates) in declaration order,
// and each step's output chained from the previous one.
var stepResults = events
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs
index 0fd7aa569..137bb28b8 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs
@@ -31,10 +31,13 @@ public async Task ReplayDeterminism_SameGuidAcrossInvocations()
// History is eventually consistent — wait until both step-succeeded events are visible.
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2,
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2
+ && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2,
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted));
+
// Each step succeeded exactly once — generate_id was NOT re-executed on replay
// (a duplicate would show up as two succeeded events for the same name).
var stepSucceededEvents = events.Where(e => e.StepSucceededDetails != null).ToList();
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryExhaustionTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryExhaustionTest.cs
new file mode 100644
index 000000000..c8ab4d85b
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryExhaustionTest.cs
@@ -0,0 +1,80 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class RetryExhaustionTest
+{
+ private readonly ITestOutputHelper _output;
+ public RetryExhaustionTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// End-to-end retry exhaustion: step always throws, maxAttempts=3.
+ /// Validates that the SDK records exactly three StepStarted/StepFailed pairs,
+ /// the final attempt produces a FAIL checkpoint (not RETRY), and the workflow
+ /// terminates FAILED with the original exception surfaced through the
+ /// execution-level error.
+ ///
+ [Fact]
+ public async Task AlwaysFailsStep_ExhaustsRetries_TerminatesFailed()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("RetryExhaustionFunction"),
+ "rexhaust", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ // Failed workflows return null payload synchronously; locate the execution by name.
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ // 2s + 4s of retry delays + 3x execution overhead. Generous headroom for scheduling.
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("FAILED", status, ignoreCase: true);
+
+ // Execution-level error is the original exception from the final attempt.
+ var execution = await deployment.GetExecutionAsync(arn!);
+ Assert.NotNull(execution.Error);
+ Assert.Contains("attempt 3", execution.Error.ErrorMessage);
+
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 3
+ && (h.Events?.Count(e => e.StepFailedDetails != null) ?? 0) >= 3,
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ // Three attempts ran in total — no extra (off-by-one) and no truncation.
+ Assert.Equal(3, events.Count(e => e.EventType == EventType.StepStarted));
+
+ // Three failures recorded; no successes.
+ Assert.Equal(3, events.Count(e => e.StepFailedDetails != null && e.Name == "always_fails_step"));
+ Assert.Empty(events.Where(e => e.StepSucceededDetails != null));
+
+ // Each recorded failure carries the right per-attempt message.
+ var failures = events
+ .Where(e => e.StepFailedDetails != null && e.Name == "always_fails_step")
+ .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty)
+ .ToList();
+ Assert.Contains(failures, m => m.Contains("attempt 1"));
+ Assert.Contains(failures, m => m.Contains("attempt 2"));
+ Assert.Contains(failures, m => m.Contains("attempt 3"));
+
+ // Service honored the retry delays. No-jitter exponential backoff at 2s/4s
+ // means the gap between the first and last StepStarted is >= 6s.
+ var startedTimestamps = events
+ .Where(e => e.EventType == EventType.StepStarted && e.EventTimestamp.HasValue)
+ .OrderBy(e => e.EventTimestamp!.Value)
+ .Select(e => e.EventTimestamp!.Value)
+ .ToList();
+ var totalGap = startedTimestamps[^1] - startedTimestamps[0];
+ _output.WriteLine($"Time between first and last attempt: {totalGap.TotalSeconds:F1}s");
+ Assert.True(totalGap >= TimeSpan.FromSeconds(6),
+ $"Service did not honor retry delays: {totalGap.TotalSeconds:F1}s gap (expected >= 6s)");
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs
new file mode 100644
index 000000000..82be3d105
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs
@@ -0,0 +1,78 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class RetryTest
+{
+ private readonly ITestOutputHelper _output;
+ public RetryTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// End-to-end retry: step throws on attempts 1 and 2, succeeds on attempt 3.
+ /// Validates that the service honors the RETRY checkpoint, schedules the
+ /// requested delay, and re-invokes the Lambda — none of which the unit
+ /// tests can prove (they fake state transitions in-memory).
+ ///
+ [Fact]
+ public async Task FlakyStep_RetriesAndSucceedsOnThirdAttempt()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("RetryFunction"),
+ "retry", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ // Initial invoke returns when the SDK suspends after the first failure.
+ // The execution continues asynchronously via service-driven re-invokes.
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ // Total expected wall time: 2s + 4s of retry delay + execution overhead.
+ // Allow generous headroom for service scheduling latency.
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 3
+ && (h.Events?.Any(e => e.StepSucceededDetails != null) ?? false),
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ // Three attempts ran (attempts 1, 2, 3).
+ Assert.Equal(3, events.Count(e => e.EventType == EventType.StepStarted));
+
+ // Two failed attempts recorded retry metadata; the final attempt succeeded.
+ Assert.Equal(2, events.Count(e => e.StepFailedDetails != null && e.Name == "flaky_step"));
+ var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "flaky_step");
+ Assert.NotNull(succeeded);
+ Assert.Equal("\"ok on attempt 3\"", succeeded!.StepSucceededDetails.Result?.Payload);
+
+ // The two recorded failure messages reflect the per-attempt exception.
+ var failures = events
+ .Where(e => e.StepFailedDetails != null && e.Name == "flaky_step")
+ .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty)
+ .ToList();
+ Assert.Contains(failures, m => m.Contains("attempt 1"));
+ Assert.Contains(failures, m => m.Contains("attempt 2"));
+
+ // Timing check: the service must have actually waited between attempts.
+ // With initialDelay=2s, backoffRate=2.0, no jitter: delays are 2s and 4s.
+ // The gap between the first and last StepStarted should be >= 6s.
+ var startedTimestamps = events
+ .Where(e => e.EventType == EventType.StepStarted && e.EventTimestamp.HasValue)
+ .OrderBy(e => e.EventTimestamp!.Value)
+ .Select(e => e.EventTimestamp!.Value)
+ .ToList();
+ var totalGap = startedTimestamps[^1] - startedTimestamps[0];
+ _output.WriteLine($"Time between first and last attempt: {totalGap.TotalSeconds:F1}s");
+ Assert.True(totalGap >= TimeSpan.FromSeconds(6),
+ $"Service did not honor retry delays: {totalGap.TotalSeconds:F1}s gap (expected >= 6s)");
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs
index 7b2afd427..b51e26b2d 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs
@@ -36,10 +36,13 @@ public async Task StepFails_PropagatesAsFailedStatus()
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => h.Events?.Any(e => e.StepFailedDetails != null) ?? false,
+ h => (h.Events?.Any(e => e.EventType == EventType.StepStarted) ?? false)
+ && (h.Events?.Any(e => e.StepFailedDetails != null) ?? false),
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(1, events.Count(e => e.EventType == EventType.StepStarted));
+
// The failing step recorded a StepFailed event with the exception message.
var stepFailed = events.FirstOrDefault(e => e.StepFailedDetails != null && e.Name == "fail_step");
Assert.NotNull(stepFailed);
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs
index 684486dd9..05e2bfc72 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs
@@ -32,11 +32,14 @@ public async Task StepWaitStep_CompletesViaService()
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2
+ && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2
&& (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false),
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted));
+
// Both steps ran in order and produced the expected chained outputs.
var stepResults = events
.Where(e => e.StepSucceededDetails != null)
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/AtMostOnceCrashFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/AtMostOnceCrashFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/AtMostOnceCrashFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs
new file mode 100644
index 000000000..0bfda43cd
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs
@@ -0,0 +1,69 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+///
+/// Exercises the AtMostOncePerRetry crash-recovery path end-to-end.
+///
+/// On attempt 1 the step kills the Lambda process AFTER the START checkpoint
+/// has been flushed but BEFORE any SUCCEED checkpoint can be written. The
+/// service re-invokes us; replay sees STARTED with no terminal record, so the
+/// SDK routes through the retry strategy with a synthesized
+/// StepInterruptedException. Attempt 2 succeeds normally.
+///
+/// The per-attempt counter is read from the input payload — the durable
+/// service preserves it across re-invokes so we can drive deterministic crash
+/// behavior on attempt 1 only.
+///
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ var result = await context.StepAsync(
+ async (ctx) =>
+ {
+ await Task.CompletedTask;
+ if (ctx.AttemptNumber == 1)
+ {
+ // Hard process exit AFTER the SDK has flushed the START
+ // checkpoint (sync flush is part of the AtMostOncePerRetry
+ // contract). The service will see a STARTED record with no
+ // terminal counterpart on the next invocation.
+ Environment.Exit(137);
+ }
+ return $"recovered on attempt {ctx.AttemptNumber}";
+ },
+ name: "crash_then_recover",
+ config: new StepConfig
+ {
+ Semantics = StepSemantics.AtMostOncePerRetry,
+ RetryStrategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromSeconds(2),
+ maxDelay: TimeSpan.FromSeconds(5),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.None)
+ });
+
+ return new TestResult { Status = "completed", Data = result };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult { public string? Status { get; set; } public string? Data { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs
new file mode 100644
index 000000000..dbb4b0d3b
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs
@@ -0,0 +1,57 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+///
+/// Five-failure retry chain: the step throws on attempts 1-5 and succeeds on
+/// attempt 6. The result payload echoes ctx.AttemptNumber on each attempt so
+/// the integration test can verify the SDK's user-facing attempt counter
+/// matches the wire-format StepDetails.Attempt value across multiple
+/// invocations.
+///
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ var result = await context.StepAsync(
+ async (ctx) =>
+ {
+ await Task.CompletedTask;
+ if (ctx.AttemptNumber < 6)
+ throw new InvalidOperationException($"flake on attempt {ctx.AttemptNumber}");
+ return $"ok on attempt {ctx.AttemptNumber}";
+ },
+ name: "long_retry_step",
+ config: new StepConfig
+ {
+ // Short delays so the test wall time stays manageable: 1s, 2s, 3s, 4s, 5s.
+ RetryStrategy = RetryStrategy.Exponential(
+ maxAttempts: 6,
+ initialDelay: TimeSpan.FromSeconds(1),
+ maxDelay: TimeSpan.FromSeconds(5),
+ backoffRate: 1.5,
+ jitter: JitterStrategy.None)
+ });
+
+ return new TestResult { Status = "completed", Data = result };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult { public string? Status { get; set; } public string? Data { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/LongRetryChainFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/LongRetryChainFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/LongRetryChainFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs
new file mode 100644
index 000000000..5a6551ec2
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs
@@ -0,0 +1,47 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ var result = await context.StepAsync(
+ async (ctx) =>
+ {
+ await Task.CompletedTask;
+ throw new InvalidOperationException($"always-fails attempt {ctx.AttemptNumber}");
+ },
+ name: "always_fails_step",
+ config: new StepConfig
+ {
+ RetryStrategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromSeconds(2),
+ maxDelay: TimeSpan.FromSeconds(10),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.None)
+ });
+
+ return new TestResult { Status = "completed", Data = result };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult { public string? Status { get; set; } public string? Data { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/RetryExhaustionFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/RetryExhaustionFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/RetryExhaustionFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs
new file mode 100644
index 000000000..9ebffdf11
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs
@@ -0,0 +1,49 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ var result = await context.StepAsync(
+ async (ctx) =>
+ {
+ await Task.CompletedTask;
+ if (ctx.AttemptNumber < 3)
+ throw new InvalidOperationException($"flake on attempt {ctx.AttemptNumber}");
+ return $"ok on attempt {ctx.AttemptNumber}";
+ },
+ name: "flaky_step",
+ config: new StepConfig
+ {
+ RetryStrategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromSeconds(2),
+ maxDelay: TimeSpan.FromSeconds(10),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.None)
+ });
+
+ return new TestResult { Status = "completed", Data = result };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult { public string? Status { get; set; } public string? Data { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs
index 7f362d605..50e511cb1 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs
@@ -620,4 +620,306 @@ private class TestPerson
public string? Name { get; set; }
public int Age { get; set; }
}
+
+ #region StepAsync Retry Tests
+
+ [Fact]
+ public async Task StepAsync_FailsWithRetryStrategy_CheckpointsRetryAndSuspends()
+ {
+ var tm = new TerminationManager();
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(null);
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = CreateLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ var stepTask = context.StepAsync(
+ async (_) => { await Task.CompletedTask; throw new InvalidOperationException("transient"); },
+ name: "flaky_step",
+ config: new StepConfig
+ {
+ RetryStrategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromSeconds(5),
+ jitter: JitterStrategy.None)
+ });
+
+ await Task.Delay(50);
+
+ Assert.True(tm.IsTerminated);
+ Assert.False(stepTask.IsCompleted);
+
+ // Fresh attempt 1 emits a fire-and-forget START (telemetry under
+ // AtLeastOncePerRetry), then a RETRY when the user code throws and
+ // the retry strategy decides to retry.
+ var checkpoints = recorder.Flushed;
+ Assert.Equal(2, checkpoints.Count);
+ Assert.Equal("START", checkpoints[0].Action);
+ Assert.Equal("RETRY", checkpoints[1].Action);
+ Assert.Equal(IdAt(1), checkpoints[1].Id);
+ Assert.Equal(5, checkpoints[1].StepOptions.NextAttemptDelaySeconds);
+ }
+
+ [Fact]
+ public async Task StepAsync_FailsNoRetryStrategy_CheckpointsFail()
+ {
+ var context = CreateContext();
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.StepAsync(
+ async (_) => { await Task.CompletedTask; throw new InvalidOperationException("permanent"); },
+ name: "fail_step"));
+
+ Assert.Equal("permanent", ex.Message);
+ }
+
+ [Fact]
+ public async Task StepAsync_RetryExhausted_CheckpointsFail()
+ {
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Pending,
+ StepDetails = new StepDetails
+ {
+ Attempt = 2,
+ NextAttemptTimestamp = DateTimeOffset.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds()
+ }
+ }
+ }
+ });
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = CreateLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ // Attempt 3 (last one) — should fail after this
+ var ex = await Assert.ThrowsAsync(() =>
+ context.StepAsync(
+ async (_) => { await Task.CompletedTask; throw new InvalidOperationException("still failing"); },
+ name: "exhaust_step",
+ config: new StepConfig
+ {
+ RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3, jitter: JitterStrategy.None)
+ }));
+
+ Assert.Equal("still failing", ex.Message);
+
+ // Fresh attempt 3 emits a fire-and-forget START (telemetry under
+ // AtLeastOncePerRetry), then a FAIL after the retry strategy gives up.
+ var checkpoints = recorder.Flushed;
+ Assert.Equal(2, checkpoints.Count);
+ Assert.Equal("START", checkpoints[0].Action);
+ Assert.Equal("FAIL", checkpoints[1].Action);
+ }
+
+ [Fact]
+ public async Task StepAsync_PendingWithFutureTimestamp_Suspends()
+ {
+ var futureMs = DateTimeOffset.UtcNow.AddSeconds(300).ToUnixTimeMilliseconds();
+ var tm = new TerminationManager();
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Pending,
+ StepDetails = new StepDetails
+ {
+ Attempt = 1,
+ NextAttemptTimestamp = futureMs
+ }
+ }
+ }
+ });
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = CreateLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ var stepTask = context.StepAsync(
+ async (_) => { await Task.CompletedTask; return "should not run"; },
+ name: "pending_step",
+ config: new StepConfig { RetryStrategy = RetryStrategy.Default });
+
+ await Task.Delay(50);
+
+ Assert.True(tm.IsTerminated);
+ Assert.False(stepTask.IsCompleted);
+ Assert.Empty(recorder.Flushed);
+ }
+
+ [Fact]
+ public async Task StepAsync_PendingWithPastTimestamp_ReExecutes()
+ {
+ var pastMs = DateTimeOffset.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds();
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Pending,
+ StepDetails = new StepDetails
+ {
+ Attempt = 1,
+ NextAttemptTimestamp = pastMs
+ }
+ }
+ }
+ });
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = CreateLambdaContext();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext);
+
+ var result = await context.StepAsync(
+ async (ctx) =>
+ {
+ await Task.CompletedTask;
+ Assert.Equal(2, ctx.AttemptNumber);
+ return "retry success";
+ },
+ name: "retry_step",
+ config: new StepConfig { RetryStrategy = RetryStrategy.Default });
+
+ Assert.Equal("retry success", result);
+ }
+
+ [Fact]
+ public async Task StepAsync_ReadyReplay_AdvancesAttemptAndExecutes()
+ {
+ // READY = service has post-PENDING re-invoked us; the retry timer
+ // already fired so no timestamp check is needed. Just advance the
+ // attempt counter and run.
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Ready,
+ StepDetails = new StepDetails { Attempt = 2 }
+ }
+ }
+ });
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = CreateLambdaContext();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext);
+
+ var executed = false;
+ var result = await context.StepAsync(
+ async (ctx) =>
+ {
+ executed = true;
+ Assert.Equal(3, ctx.AttemptNumber);
+ await Task.CompletedTask;
+ return "ok";
+ },
+ name: "ready_step",
+ config: new StepConfig { RetryStrategy = RetryStrategy.Default });
+
+ Assert.True(executed);
+ Assert.Equal("ok", result);
+ Assert.False(tm.IsTerminated);
+ Assert.False(state.IsReplaying);
+ }
+
+ [Fact]
+ public async Task StepAsync_AtMostOnce_FlushesStartBeforeExecution()
+ {
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(null);
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = CreateLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ IReadOnlyList? flushedAtFuncEntry = null;
+
+ var result = await context.StepAsync(
+ async (_) =>
+ {
+ flushedAtFuncEntry = recorder.Flushed.Select(o => o.Action.ToString()).ToArray();
+ await Task.CompletedTask;
+ return "done";
+ },
+ name: "amo_step",
+ config: new StepConfig { Semantics = StepSemantics.AtMostOncePerRetry });
+
+ Assert.Equal("done", result);
+
+ // START must be flushed before user func runs (AtMostOnce invariant).
+ Assert.NotNull(flushedAtFuncEntry);
+ Assert.Equal(new[] { "START" }, flushedAtFuncEntry);
+
+ // After step returns, SUCCEED has also been flushed.
+ var actions = recorder.Flushed.Select(o => o.Action.ToString()).ToArray();
+ Assert.Equal(new[] { "START", "SUCCEED" }, actions);
+ }
+
+ [Fact]
+ public async Task StepAsync_AtMostOnce_StartedReplay_TriggersRetryHandler()
+ {
+ var tm = new TerminationManager();
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Started
+ }
+ }
+ });
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = CreateLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ var executed = false;
+ var stepTask = context.StepAsync(
+ async (_) => { executed = true; await Task.CompletedTask; return "should not run"; },
+ name: "amo_replay",
+ config: new StepConfig
+ {
+ Semantics = StepSemantics.AtMostOncePerRetry,
+ RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3, jitter: JitterStrategy.None)
+ });
+
+ await Task.Delay(50);
+
+ Assert.False(executed);
+ Assert.True(tm.IsTerminated);
+ Assert.False(stepTask.IsCompleted);
+
+ var checkpoints = recorder.Flushed;
+ Assert.Single(checkpoints);
+ Assert.Equal("RETRY", checkpoints[0].Action);
+ }
+
+ #endregion
}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs
index a967c1ce2..2e3841c89 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs
@@ -198,30 +198,37 @@ public async Task WrapAsync_CheckpointsAreSentToService()
mockClient);
Assert.Equal(InvocationStatus.Pending, output.Status);
- Assert.Equal(2, mockClient.CheckpointCalls.Count);
-
- // First flush: step SUCCEED (the user awaits StepAsync, which awaits
- // its SUCCEED enqueue, which blocks until the batcher flushes it).
- var firstCall = mockClient.CheckpointCalls[0];
- Assert.Equal("arn:aws:lambda:us-east-1:123:durable-execution:checkpoint-test", firstCall.DurableExecutionArn);
- Assert.Equal("initial-token", firstCall.CheckpointToken);
- Assert.Single(firstCall.Updates);
- var stepUpdate = firstCall.Updates[0];
- Assert.Equal("STEP", stepUpdate.Type);
- Assert.Equal("SUCCEED", stepUpdate.Action);
- Assert.Equal("validate", stepUpdate.Name);
- Assert.NotNull(stepUpdate.Payload);
-
- // Second flush: wait START (blocks until the service has the timer
- // recorded before WaitAsync suspends).
- var secondCall = mockClient.CheckpointCalls[1];
- Assert.Single(secondCall.Updates);
- var waitUpdate = secondCall.Updates[0];
- Assert.Equal("WAIT", waitUpdate.Type);
- Assert.Equal("START", waitUpdate.Action);
- Assert.Equal("delay", waitUpdate.Name);
- Assert.NotNull(waitUpdate.WaitOptions);
- Assert.Equal(30, waitUpdate.WaitOptions.WaitSeconds);
+
+ // Each StepAsync emits a fire-and-forget START before user code runs
+ // (telemetry under AtLeastOncePerRetry). With FlushInterval = 0 the
+ // worker may flush the START on its own before SUCCEED arrives, so the
+ // exact batching of START vs SUCCEED is timing-dependent. Assert on
+ // the flat sequence of updates instead.
+ var allUpdates = mockClient.CheckpointCalls
+ .SelectMany(c => c.Updates)
+ .ToList();
+
+ // Expect: step START, step SUCCEED, wait START (in that order).
+ Assert.Equal(3, allUpdates.Count);
+
+ Assert.Equal("STEP", allUpdates[0].Type);
+ Assert.Equal("START", allUpdates[0].Action);
+ Assert.Equal("validate", allUpdates[0].Name);
+
+ Assert.Equal("STEP", allUpdates[1].Type);
+ Assert.Equal("SUCCEED", allUpdates[1].Action);
+ Assert.Equal("validate", allUpdates[1].Name);
+ Assert.NotNull(allUpdates[1].Payload);
+
+ Assert.Equal("WAIT", allUpdates[2].Type);
+ Assert.Equal("START", allUpdates[2].Action);
+ Assert.Equal("delay", allUpdates[2].Name);
+ Assert.NotNull(allUpdates[2].WaitOptions);
+ Assert.Equal(30, allUpdates[2].WaitOptions.WaitSeconds);
+
+ // The first call sends the initial checkpoint token.
+ Assert.Equal("arn:aws:lambda:us-east-1:123:durable-execution:checkpoint-test", mockClient.CheckpointCalls[0].DurableExecutionArn);
+ Assert.Equal("initial-token", mockClient.CheckpointCalls[0].CheckpointToken);
}
[Fact]
@@ -502,9 +509,9 @@ public async Task WrapAsync_CheckpointThrowsTransient_PropagatesToHost(AmazonSer
public async Task WrapAsync_HydrationThrows_AlwaysPropagatesToHost()
{
// State hydration is OUTSIDE the IsTerminalCheckpointError try/catch — every
- // GetExecutionStateAsync failure escapes for Lambda retry, matching Python's
- // GetExecutionStateError (an InvocationError). Use a 4xx that *would* be terminal
- // if it came from a checkpoint flush to prove the path isn't classified.
+ // GetExecutionStateAsync failure escapes for Lambda retry. Use a 4xx that
+ // *would* be terminal if it came from a checkpoint flush to prove the path
+ // isn't classified.
var input = new DurableExecutionInvocationInput
{
DurableExecutionArn = "arn:aws:lambda:us-east-1:123:durable-execution:hydrate-fail",
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs
index cacc68a62..ea4fe0b03 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs
@@ -49,8 +49,7 @@ public void LoadFromCheckpoint_OnlyExecutionInputOp_NotReplaying()
{
// The service sends one EXECUTION-type op carrying the input payload
// even on the first invocation. That op is bookkeeping, not user
- // history — it must not put us into replay mode. (Matches Python
- // execution.py:258, Java ExecutionManager:81, JS execution-context.ts:62.)
+ // history — it must not put us into replay mode.
var state = new ExecutionState();
state.LoadFromCheckpoint(new InitialExecutionState
{
@@ -105,8 +104,8 @@ public void TrackReplay_PendingOpDoesNotBlockTransition()
{
// A PENDING op (e.g. retry timer waiting) is not "completed" in the
// checkpoint sense — once the workflow has visited every terminally-
- // completed op the SDK treats subsequent code as fresh. Matches Python's
- // {SUCCEEDED, FAILED, CANCELLED, STOPPED, TIMED_OUT} terminal set.
+ // completed op the SDK treats subsequent code as fresh. Terminal set
+ // is {SUCCEEDED, FAILED, CANCELLED, STOPPED}.
var state = new ExecutionState();
state.LoadFromCheckpoint(new InitialExecutionState
{
@@ -199,8 +198,8 @@ public void HasOperation_ReturnsTrueForExisting()
public void GetOperation_ReturnsLatestRecord_WhenIdAppearsMultipleTimes()
{
// Wire format: when the service replays an envelope it includes the
- // most recent record per ID. Java/Python/JS reference SDKs all key by
- // ID alone and rely on the service to provide the authoritative record.
+ // most recent record per ID. We key by ID alone and rely on the service
+ // to provide the authoritative record.
var state = new ExecutionState();
state.LoadFromCheckpoint(new InitialExecutionState
{
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs
new file mode 100644
index 000000000..e5a277fb6
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs
@@ -0,0 +1,202 @@
+using Amazon.Lambda.DurableExecution;
+using Xunit;
+
+namespace Amazon.Lambda.DurableExecution.Tests;
+
+public class RetryStrategyTests
+{
+ [Fact]
+ public void ExponentialDefault_RetriesUpToMaxAttempts()
+ {
+ var strategy = RetryStrategy.Default;
+
+ // Attempts 1-5 should retry (maxAttempts=6 means 6 total attempts)
+ for (int i = 1; i < 6; i++)
+ {
+ var decision = strategy.ShouldRetry(new InvalidOperationException("fail"), i);
+ Assert.True(decision.ShouldRetry);
+ Assert.True(decision.Delay >= TimeSpan.FromSeconds(1));
+ }
+
+ // Attempt 6 should not retry (exhausted)
+ var lastDecision = strategy.ShouldRetry(new InvalidOperationException("fail"), 6);
+ Assert.False(lastDecision.ShouldRetry);
+ }
+
+ [Fact]
+ public void None_NeverRetries()
+ {
+ var strategy = RetryStrategy.None;
+
+ var decision = strategy.ShouldRetry(new Exception("fail"), 1);
+ Assert.False(decision.ShouldRetry);
+ }
+
+ [Fact]
+ public void Transient_RetriesUpTo3Attempts()
+ {
+ var strategy = RetryStrategy.Transient;
+
+ Assert.True(strategy.ShouldRetry(new Exception("fail"), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new Exception("fail"), 2).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new Exception("fail"), 3).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_DelayIncreases()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 5,
+ initialDelay: TimeSpan.FromSeconds(2),
+ maxDelay: TimeSpan.FromSeconds(120),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.None);
+
+ var d1 = strategy.ShouldRetry(new Exception(), 1).Delay;
+ var d2 = strategy.ShouldRetry(new Exception(), 2).Delay;
+ var d3 = strategy.ShouldRetry(new Exception(), 3).Delay;
+
+ // With no jitter: 2s, 4s, 8s (ceiling to whole seconds)
+ Assert.Equal(TimeSpan.FromSeconds(2), d1);
+ Assert.Equal(TimeSpan.FromSeconds(4), d2);
+ Assert.Equal(TimeSpan.FromSeconds(8), d3);
+ }
+
+ [Fact]
+ public void Exponential_DelayCapsAtMax()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 10,
+ initialDelay: TimeSpan.FromSeconds(10),
+ maxDelay: TimeSpan.FromSeconds(30),
+ backoffRate: 3.0,
+ jitter: JitterStrategy.None);
+
+ // Attempt 3: 10 * 3^2 = 90, capped to 30
+ var decision = strategy.ShouldRetry(new Exception(), 3);
+ Assert.Equal(TimeSpan.FromSeconds(30), decision.Delay);
+ }
+
+ [Fact]
+ public void Exponential_FullJitter_BoundedByDelay()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 5,
+ initialDelay: TimeSpan.FromSeconds(10),
+ maxDelay: TimeSpan.FromSeconds(100),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.Full);
+
+ // Run multiple times to check bounds
+ for (int i = 0; i < 50; i++)
+ {
+ var decision = strategy.ShouldRetry(new Exception(), 1);
+ Assert.True(decision.Delay >= TimeSpan.FromSeconds(1));
+ Assert.True(decision.Delay <= TimeSpan.FromSeconds(10));
+ }
+ }
+
+ [Fact]
+ public void Exponential_HalfJitter_BoundedBetween50And100Percent()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 5,
+ initialDelay: TimeSpan.FromSeconds(10),
+ maxDelay: TimeSpan.FromSeconds(100),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.Half);
+
+ for (int i = 0; i < 50; i++)
+ {
+ var decision = strategy.ShouldRetry(new Exception(), 1);
+ Assert.True(decision.Delay >= TimeSpan.FromSeconds(5));
+ Assert.True(decision.Delay <= TimeSpan.FromSeconds(10));
+ }
+ }
+
+ [Fact]
+ public void Exponential_RetryableExceptions_FiltersCorrectly()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ retryableExceptions: new[] { typeof(TimeoutException), typeof(HttpRequestException) });
+
+ Assert.True(strategy.ShouldRetry(new TimeoutException(), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new HttpRequestException(), 1).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new InvalidOperationException(), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_RetryableExceptions_MatchesDerivedTypes()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ retryableExceptions: new[] { typeof(IOException) });
+
+ Assert.True(strategy.ShouldRetry(new FileNotFoundException(), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_MessagePatterns_FiltersCorrectly()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ retryableMessagePatterns: new[] { "timeout", "throttl", "5\\d{2}" });
+
+ Assert.True(strategy.ShouldRetry(new Exception("connection timeout"), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new Exception("request throttled"), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new Exception("HTTP 503"), 1).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new Exception("not found"), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_BothFilters_EitherMatches()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ retryableExceptions: new[] { typeof(TimeoutException) },
+ retryableMessagePatterns: new[] { "throttl" });
+
+ // Matches exception type
+ Assert.True(strategy.ShouldRetry(new TimeoutException("any message"), 1).ShouldRetry);
+ // Matches message pattern
+ Assert.True(strategy.ShouldRetry(new Exception("throttled"), 1).ShouldRetry);
+ // Matches neither
+ Assert.False(strategy.ShouldRetry(new InvalidOperationException("bad state"), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_NoFilters_RetriesAllExceptions()
+ {
+ var strategy = RetryStrategy.Exponential(maxAttempts: 3);
+
+ Assert.True(strategy.ShouldRetry(new Exception("anything"), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new InvalidOperationException(), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new OutOfMemoryException(), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_MinimumDelayIsOneSecond()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromMilliseconds(100),
+ jitter: JitterStrategy.None);
+
+ var decision = strategy.ShouldRetry(new Exception(), 1);
+ Assert.True(decision.Delay >= TimeSpan.FromSeconds(1));
+ }
+
+ [Fact]
+ public void FromDelegate_UsesProvidedFunction()
+ {
+ var strategy = RetryStrategy.FromDelegate((ex, attempt) =>
+ attempt < 2 && ex is TimeoutException
+ ? RetryDecision.RetryAfter(TimeSpan.FromSeconds(5))
+ : RetryDecision.DoNotRetry());
+
+ Assert.True(strategy.ShouldRetry(new TimeoutException(), 1).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new TimeoutException(), 2).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new Exception(), 1).ShouldRetry);
+ }
+}