Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,23 @@ public StepException(string message) : base(message) { }
/// <summary>Creates a <see cref="StepException"/> wrapping an inner exception.</summary>
public StepException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// Thrown when a step under <see cref="StepSemantics.AtMostOncePerRetry"/> is
/// detected to have been interrupted mid-execution on a prior invocation
/// (replay sees a <c>STARTED</c> checkpoint with no terminal record).
/// </summary>
/// <remarks>
/// Surfaces in <see cref="IRetryStrategy.ShouldRetry"/> so user-supplied
/// strategies can distinguish "my code threw" from "a previous attempt
/// crashed before it could record a result".
/// </remarks>
public class StepInterruptedException : StepException
{
/// <summary>Creates an empty <see cref="StepInterruptedException"/>.</summary>
public StepInterruptedException() { }
/// <summary>Creates a <see cref="StepInterruptedException"/> with the given message.</summary>
public StepInterruptedException(string message) : base(message) { }
/// <summary>Creates a <see cref="StepInterruptedException"/> wrapping an inner exception.</summary>
public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ private static async Task<DurableExecutionInvocationOutput> WrapAsyncCore<TInput
///
/// State-hydration errors (<c>GetExecutionStateAsync</c>) propagate as
/// <see cref="DurableExecutionException"/> too, but they are NOT caught here — they
/// flow up to the host so Lambda retries, matching Python's <c>GetExecutionStateError</c>
/// (which extends <c>InvocationError</c>).
/// flow up to the host so Lambda retries.
///
/// User-code SDK errors (e.g. an SDK call inside a Step body) are caught by
/// <c>StepRunner</c> and surfaced as <c>StepException</c> for the workflow's normal
Expand Down
39 changes: 39 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/IRetryStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Determines whether a failed step should be retried and with what delay.
/// </summary>
public interface IRetryStrategy
{
/// <summary>
/// Evaluates whether the given exception warrants a retry.
/// </summary>
/// <param name="exception">The exception that caused the step to fail.</param>
/// <param name="attemptNumber">The 1-based attempt number that just failed.</param>
/// <returns>A decision indicating whether to retry and the delay before the next attempt.</returns>
RetryDecision ShouldRetry(Exception exception, int attemptNumber);
}

/// <summary>
/// The outcome of a retry evaluation.
/// </summary>
public readonly struct RetryDecision
{
/// <summary>Whether the step should be retried.</summary>
public bool ShouldRetry { get; }

/// <summary>The delay before the next retry attempt.</summary>
public TimeSpan Delay { get; }

private RetryDecision(bool shouldRetry, TimeSpan delay)
{
ShouldRetry = shouldRetry;
Delay = delay;
}

/// <summary>Indicates the step should not be retried.</summary>
public static RetryDecision DoNotRetry() => new(false, TimeSpan.Zero);

/// <summary>Indicates the step should be retried after the specified delay.</summary>
public static RetryDecision RetryAfter(TimeSpan delay) => new(true, delay);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ namespace Amazon.Lambda.DurableExecution.Internal;
/// call awaits the flush of its containing batch (sync semantics).
/// </summary>
/// <remarks>
/// TODO: when Map / Parallel / ChildContext / WaitForCondition land — or when
/// AtLeastOncePerRetry step START gets a non-blocking variant — they will need
/// a fire-and-forget overload like
/// <c>Task EnqueueAsync(SdkOperationUpdate update, bool sync)</c> where
/// <c>sync=false</c> returns as soon as the item is queued. Java's
/// <c>sendOperationUpdate</c> vs <c>sendOperationUpdateAsync</c> is the model.
/// Today every call site is sync, so the API stays minimal.
/// Fire-and-forget semantics are achieved by simply not awaiting the returned
/// Task. Errors still surface deterministically via <c>_terminalError</c>: the
/// next sync <see cref="EnqueueAsync"/> or <see cref="DrainAsync"/> rethrows.
/// Callers using fire-and-forget should observe the discarded Task's exception
/// (see <c>StepOperation.FireAndForget</c>) so it doesn't trip the runtime's
/// <c>UnobservedTaskException</c> event.
/// </remarks>
internal sealed class CheckpointBatcher : IAsyncDisposable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ internal sealed class CheckpointBatcherConfig
/// <remarks>
/// TODO: not enforced today. The worker only checks <see cref="MaxBatchOperations"/>;
/// a single oversized item (or a batch whose serialized size exceeds 750 KB)
/// will be sent to the service and rejected there. Java/JS/Python all
/// pre-flight this on the in-flight batch and split before the next add.
/// Wire this in alongside the async-flush operations (Map / Parallel /
/// child-context) since those are the scenarios that can actually fill a
/// batch — today every batch is 1 item with <see cref="FlushInterval"/>
/// = Zero, so the gap is latent.
/// will be sent to the service and rejected there. Wire this in alongside
/// the async-flush operations (Map / Parallel / child-context) since those
/// are the scenarios that can actually fill a batch — today every batch is
/// 1 item with <see cref="FlushInterval"/> = Zero, so the gap is latent.
/// </remarks>
internal int MaxBatchBytes { get; init; } = 750 * 1024;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this max batch bytes thing is still a todo item in another pr

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace Amazon.Lambda.DurableExecution.Internal;
/// <see cref="CheckpointBatcher"/>; this type is the inbound side only.
/// </summary>
/// <remarks>
/// Replay tracking mirrors the Python / Java / JavaScript reference SDKs:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just updating docs here and everywhere to remove stuff like "similar to python/js"

/// <list type="bullet">
/// <item>At construction the workflow is "replaying" if and only if any user-replayable
/// op is present. The service always sends one <c>EXECUTION</c>-type op
Expand Down Expand Up @@ -41,11 +40,15 @@ public void LoadFromCheckpoint(InitialExecutionState? initialState)
AddOperations(initialState.Operations);
}

// Only user-replayable ops put us into replay mode. The service-side
// EXECUTION op (input payload bookkeeping) is always present and must
// not count — see Python execution.py:258 / Java ExecutionManager:81 /
// JS execution-context.ts:62 for the same rule.
(_isReplaying, _remainingReplayOps) = ScanReplayable();
// We're "replaying" when there are completed ops (SUCCEEDED, FAILED,
// CANCELLED, STOPPED) we need to re-derive before resuming live work.
// The service-side EXECUTION op (input payload bookkeeping) is always
// present and doesn't count. If the only ops are in-progress
// (READY/PENDING/STARTED), there's nothing to re-derive — the next
// user call IS the next thing to run — so IsReplaying starts false.
var (_, terminalCount) = ScanReplayable();
_remainingReplayOps = terminalCount;
_isReplaying = terminalCount > 0;
Comment thread
GarrettBeatty marked this conversation as resolved.
}

public void AddOperations(IEnumerable<Operation> operations)
Expand Down Expand Up @@ -91,8 +94,11 @@ public void TrackReplay(string operationId)

public void ValidateReplayConsistency(string operationId, string expectedType, string? expectedName)
{
if (!_isReplaying) return;

// Independent of IsReplaying: as long as a checkpoint record exists
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change catches this scenario

  1. Deploy v1: workflow calls StepAsync(name: "fetch") first.
  2. The step fails, the SDK writes a RETRY checkpoint, the service stores Operation { Id: hash("1"), Type: "STEP", Status: "PENDING" }, and re-invokes after the delay.
  3. Before re-invoke, deploy v2: workflow now calls WaitAsync(name: "fetch") first instead. (The user shouldn't do this, but it's the exact scenario ValidateReplayConsistency is supposed to catch.)
  4. Service re-invokes Lambda. The checkpoint envelope contains the PENDING step record at hash("1")_isReplaying = false (no terminal ops, only the PENDING one).
  5. User code reaches the first await: WaitAsync(name: "fetch") at hash("1")ValidateReplayConsistency is called.

With the old if (!_isReplaying) return; short-circuit at the top, this validation would return early — even though _operations[hash("1")] exists with Type = "STEP" and our user is asking for type "WAIT". The mismatch slips through silently. Then ReplayAsync runs against a STEP record while user code expects a WAIT, producing weird downstream behavior.

// for this id, its type/name must match what user code is asking for.
// If the only checkpointed ops are in-progress (PENDING/READY/STARTED),
// IsReplaying is false but the records still exist and code drift can
// still produce a mismatch.
if (!_operations.TryGetValue(operationId, out var op)) return;

if (op.Type != null && op.Type != expectedType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ namespace Amazon.Lambda.DurableExecution.Internal;
/// <summary>
/// Generates deterministic operation IDs for durable operations. Each call
/// increments an internal counter and SHA-256 hashes <c>"&lt;parentId&gt;-&lt;counter&gt;"</c>
/// (or just <c>"&lt;counter&gt;"</c> at the root). Hashing matches the wire format
/// used by the Java/JS/Python SDKs so the same workflow position produces a
/// stable, opaque ID across replays — and the human-readable step name is
/// carried separately on <c>OperationUpdate.Name</c>, so renaming a step does
/// not break replay correlation.
/// (or just <c>"&lt;counter&gt;"</c> at the root). The same workflow position
/// produces a stable, opaque ID across replays — and the human-readable step
/// name is carried separately on <c>OperationUpdate.Name</c>, so renaming a
/// step does not break replay correlation.
/// </summary>
internal sealed class OperationIdGenerator
{
Expand Down Expand Up @@ -46,7 +45,7 @@ public OperationIdGenerator(string? parentId)

/// <summary>
/// Generates the next operation ID. The counter is pre-incremented so the
/// first ID is <c>hash("1")</c>, matching the reference SDKs.
/// first ID is <c>hash("1")</c>.
/// </summary>
/// <remarks>
/// Uses <see cref="Interlocked.Increment(ref int)"/> so concurrent callers
Expand All @@ -55,7 +54,7 @@ public OperationIdGenerator(string? parentId)
/// <c>MapAsync</c> branches that fan out before awaiting) cannot collide
/// on the same ID. Determinism still requires that calls happen in a
/// deterministic order — atomicity prevents duplicate IDs but not
/// reordering between replays. Matches Java's <c>AtomicInteger.incrementAndGet</c>.
/// reordering between replays.
/// </remarks>
public string NextId()
{
Expand Down
Loading
Loading