From 8a6c41c33128fa11681c125e66c1e743e025e966 Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Thu, 14 May 2026 13:19:31 -0400 Subject: [PATCH 1/2] Add RunInChildContextAsync Adds child-context support to the .NET Durable Execution SDK. A child context is a logical sub-workflow with its own deterministic operation-ID space, persisted as a CONTEXT operation so subsequent invocations replay the cached value without re-executing the function. Public surface: - IDurableContext.RunInChildContextAsync (reflection + AOT-safe ICheckpointSerializer overloads, plus a void overload). - ChildContextConfig with SubType (observability label) and ErrorMapping (transform exceptions before they surface to the caller). - ChildContextException for failure surfacing. Used as a building block for upcoming WaitForCallbackAsync. Co-Authored-By: Claude Opus 4.7 (1M context) update docs (#2372) Co-authored-by: Claude Opus 4.7 (1M context) --- Docs/durable-execution-design.md | 140 +++--- .../ChildContextConfig.cs | 32 ++ .../DurableContext.cs | 55 +- .../DurableExecutionException.cs | 28 ++ .../IDurableContext.cs | 41 ++ .../Internal/ChildContextOperation.cs | 201 ++++++++ .../Internal/DurableOperation.cs | 3 + .../Internal/StepOperation.cs | 7 +- .../Internal/WaitOperation.cs | 4 +- .../Services/LambdaDurableServiceClient.cs | 16 +- .../ChildContextFailsTest.cs | 93 ++++ .../ChildContextRetryFailsTest.cs | 111 ++++ .../ChildContextTest.cs | 109 ++++ .../ChildContextFailsFunction.csproj | 18 + .../ChildContextFailsFunction/Dockerfile | 7 + .../ChildContextFailsFunction/Function.cs | 45 ++ .../ChildContextFunction.csproj | 18 + .../ChildContextFunction/Dockerfile | 7 + .../ChildContextFunction/Function.cs | 51 ++ .../ChildContextRetryFailsFunction.csproj | 18 + .../ChildContextRetryFailsFunction/Dockerfile | 7 + .../Function.cs | 58 +++ .../ChildContextOperationTests.cs | 473 ++++++++++++++++++ .../DurableContextTests.cs | 7 +- .../LambdaDurableServiceClientTests.cs | 103 ++++ 25 files changed, 1578 insertions(+), 74 deletions(-) create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/ChildContextConfig.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextFailsTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextRetryFailsTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/ChildContextFailsFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/ChildContextFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/ChildContextRetryFailsFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs diff --git a/Docs/durable-execution-design.md b/Docs/durable-execution-design.md index f639daa7a..496e7ec9c 100644 --- a/Docs/durable-execution-design.md +++ b/Docs/durable-execution-design.md @@ -1197,10 +1197,9 @@ public class StepConfig public StepSemantics Semantics { get; set; } = StepSemantics.AtLeastOncePerRetry; // Note: there is no Serializer property here. Step result serialization - // is delegated to the ILambdaSerializer registered on ILambdaContext.Serializer - // (assembly attribute or LambdaBootstrapBuilder.Create). To swap the - // step-checkpoint format for a single step, the planned route is the - // StepAsync(..., ICheckpointSerializer, ...) overload (post-v1). + // is delegated to the ILambdaSerializer registered on + // ILambdaContext.Serializer (assembly attribute or + // LambdaBootstrapBuilder.Create). } public enum StepSemantics @@ -1231,10 +1230,9 @@ public class CallbackConfig /// public TimeSpan HeartbeatTimeout { get; set; } = TimeSpan.Zero; - /// - /// Custom serializer for callback result. - /// - public ICheckpointSerializer? Serializer { get; set; } + // Note: there is no Serializer property here. Callback result + // serialization flows through the ILambdaSerializer registered on + // ILambdaContext.Serializer, the same as StepAsync. } /// @@ -1259,14 +1257,14 @@ public class InvokeConfig public TimeSpan Timeout { get; set; } = TimeSpan.Zero; /// - /// Custom serializer for the payload. + /// Optional tenant identifier propagated to the chained invocation. + /// Matches the tenantId field on Python/JS/Java InvokeConfig. /// - public ICheckpointSerializer? PayloadSerializer { get; set; } + public string? TenantId { get; set; } - /// - /// Custom serializer for the result. - /// - public ICheckpointSerializer? ResultSerializer { get; set; } + // Note: there are no payload/result serializer properties here. Both + // flow through the ILambdaSerializer registered on + // ILambdaContext.Serializer, the same as StepAsync. } /// @@ -1381,10 +1379,9 @@ public class CompletionConfig /// public class ChildContextConfig { - /// - /// Custom serializer for the child context's return value. - /// - public ICheckpointSerializer? Serializer { get; set; } + // Note: there is no Serializer property here. The child context's + // return value is serialized via the ILambdaSerializer registered on + // ILambdaContext.Serializer, the same as StepAsync. /// /// Operation sub-type label for observability (e.g., in test runner output). @@ -1425,34 +1422,54 @@ public class WaitForConditionConfig public interface IBatchResult { /// - /// All items (succeeded and failed). + /// All items, in original index order. /// IReadOnlyList> All { get; } /// - /// Only successful items. + /// Items whose Status is Succeeded. /// IReadOnlyList> Succeeded { get; } /// - /// Only failed items. + /// Items whose Status is Failed. /// IReadOnlyList> Failed { get; } /// - /// Get all successful results. Throws if any failed. + /// Items still in flight when the batch resolved (CompletionConfig short-circuit). + /// + IReadOnlyList> Started { get; } + + /// + /// Get all successful results in original index order. Throws if any failed. /// IReadOnlyList GetResults(); /// - /// Throw an exception if any item failed. + /// Get all errors from failed items. + /// + IReadOnlyList GetErrors(); + + /// + /// Throw a single aggregated exception if any item failed. /// void ThrowIfError(); /// - /// Why the operation completed. + /// True if any item is in the Failed state. + /// + bool HasFailure { get; } + + /// + /// Why the batch resolved. /// CompletionReason CompletionReason { get; } + + int SuccessCount { get; } + int FailureCount { get; } + int StartedCount { get; } + int TotalCount { get; } } public interface IBatchItem @@ -1463,7 +1480,29 @@ public interface IBatchItem DurableExecutionException? Error { get; } } -public enum BatchItemStatus { Succeeded, Failed, Cancelled } +/// +/// Status of an individual item in a batch result. +/// Mirrors the wire-state observed at the time the batch resolved — items still +/// running when a CompletionConfig short-circuits remain in . +/// +public enum BatchItemStatus +{ + /// + /// The branch ran to completion and produced a result. + /// + Succeeded, + + /// + /// The branch ran to completion and threw. + /// + Failed, + + /// + /// The branch was still in flight when the batch's CompletionConfig + /// resolved (e.g., FirstSuccessful returned before this branch finished). + /// + Started +} public enum CompletionReason { AllCompleted, MinSuccessfulReached, FailureToleranceExceeded } /// @@ -1616,32 +1655,17 @@ public class BadResult ### Custom Serialization -Implement `ICheckpointSerializer` for custom serialization: +There is no per-call serializer override on any durable-execution API. Every checkpoint — step results, callback results, invoke payloads/results, child-context results — is serialized via the `ILambdaSerializer` registered on `ILambdaContext.Serializer`. To customize, register a different `ILambdaSerializer` for the function: ```csharp -public interface ICheckpointSerializer -{ - string Serialize(T value, SerializationContext context); - T Deserialize(string data, SerializationContext context); -} +// Class library mode — register via the assembly attribute. +[assembly: LambdaSerializer(typeof(MyCustomSerializer))] -public record SerializationContext(string OperationId, string DurableExecutionArn); +// Executable / custom runtime — pass to LambdaBootstrapBuilder.Create. +using var bootstrap = LambdaBootstrapBuilder.Create(handler, new MyCustomSerializer()).Build(); ``` -Usage — pass the serializer to the per-step `StepAsync` overload directly. This is -the only way to override the registered `ILambdaSerializer` for a single step's -checkpoint; it's intentional that there's no `StepConfig.Serializer` knob, so you -have one obvious place to opt in (and the type is `ICheckpointSerializer`, not -a non-generic marker, so the compiler catches a mismatched `T`): - -```csharp -var result = await context.StepAsync( - async () => await GetLargeData(), - new CompressedJsonSerializer(), - name: "get_data"); -``` - -> **Status:** the `ICheckpointSerializer` overload is a planned post-v1 addition. Today, all step checkpoints flow through the `ILambdaSerializer` registered on `ILambdaContext.Serializer` — see [NativeAOT compatibility](#nativeaot-compatibility) for how that's wired. +The customization applies uniformly to the whole function — there is no way today to swap the format for a single step or a single result type. See [NativeAOT compatibility](#nativeaot-compatibility) for how the registration flows in JIT vs. AOT. ### Class library vs. executable output @@ -1722,27 +1746,7 @@ The SDK handles overflow transparently: **Lambda response exceeding 6 MB:** If the final orchestration result exceeds the response payload limit, the SDK checkpoints the result before returning the `DurableExecutionInvocationOutput`. The service reads the result from the checkpoint rather than from the response body. -**Guidance for very large results:** For results that are inherently large (multi-MB payloads), use a custom `ICheckpointSerializer` that offloads to external storage (S3, DynamoDB) and returns a reference. This keeps checkpoint sizes small and avoids pagination overhead: - -```csharp -public class S3BackedSerializer : ICheckpointSerializer -{ - public string Serialize(T value, SerializationContext context) - { - var key = $"results/{context.DurableExecutionArn}/{context.OperationId}"; - // Upload to S3, return the key as the checkpoint value - _s3Client.PutObject(new PutObjectRequest { BucketName = _bucket, Key = key, ... }); - return key; - } - - public T Deserialize(string data, SerializationContext context) - { - // Download from S3 using the stored key - var response = _s3Client.GetObject(new GetObjectRequest { BucketName = _bucket, Key = data }); - return JsonSerializer.Deserialize(response.ResponseStream); - } -} -``` +**Guidance for very large results:** For results that are inherently large (multi-MB payloads), do the offload yourself inside the step — write the payload to external storage (S3, DynamoDB) and return a reference (e.g. an S3 key) from the step. The reference is what the SDK serializes and checkpoints, so the checkpoint stays small and pagination is avoided. Subsequent steps fetch the payload from external storage on demand. --- @@ -1946,7 +1950,7 @@ This is post-v1 work. For the initial release, developers test durable functions - **Lambda runtime:** Requires the managed .NET 8 runtime or a custom runtime (`provided.al2023`) for NativeAOT deployments. - **Durable execution service:** The function must be configured with `DurableConfig` (handled automatically by the `[DurableExecution]` source generator). - **Qualified function identifiers:** `InvokeAsync` requires a version number, alias, or `$LATEST` — unqualified ARNs are not supported for durable invocations. -- **Serializable results:** All step return types must be JSON-serializable (or use a custom `ICheckpointSerializer`). +- **Serializable results:** All step return types must be serializable by the `ILambdaSerializer` registered on `ILambdaContext.Serializer` (default: `System.Text.Json`). --- diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/ChildContextConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/ChildContextConfig.cs new file mode 100644 index 000000000..7840211fc --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/ChildContextConfig.cs @@ -0,0 +1,32 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Configuration for a child context. +/// +/// +/// A child context is a logical sub-workflow with its own deterministic +/// operation-ID space, persisted as a CONTEXT operation. Use +/// +/// (and overloads) to run code inside one. +/// +public sealed class ChildContextConfig +{ + /// + /// Operation sub-type label for observability (e.g. "WaitForCallback"). + /// Surfaces on the wire OperationUpdate.SubType field. + /// + public string? SubType { get; set; } + + /// + /// Optional function to transform exceptions thrown by the child context's + /// user function before they surface to the caller. Useful for wrapping + /// low-level errors into domain-specific exceptions. + /// + /// + /// Applied when the user function throws (the mapped exception propagates + /// to the caller of RunInChildContextAsync) and on replay of a + /// FAILED child context (the constructed + /// is mapped before being thrown). + /// + public Func? ErrorMapping { get; set; } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs index e79cee30b..ec1b58ed6 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs @@ -76,7 +76,7 @@ private Task RunStep( var operationId = _idGenerator.NextId(); var op = new StepOperation( - operationId, name, func, config, serializer, Logger, + operationId, name, _idGenerator.ParentId, func, config, serializer, Logger, _state, _terminationManager, _durableExecutionArn, _batcher); return op.ExecuteAsync(cancellationToken); } @@ -99,7 +99,58 @@ public Task WaitAsync( var operationId = _idGenerator.NextId(); var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds)); var op = new WaitOperation( - operationId, name, waitSeconds, + operationId, name, _idGenerator.ParentId, waitSeconds, + _state, _terminationManager, _durableExecutionArn, _batcher); + return op.ExecuteAsync(cancellationToken); + } + + public Task RunInChildContextAsync( + Func> func, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default) + => RunChildContext(func, name, config, cancellationToken); + + public async Task RunInChildContextAsync( + Func func, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default) + { + // Void child contexts don't carry a meaningful payload; the wrapper + // returns null so the registered ILambdaSerializer is never asked to + // serialize a real value. + await RunChildContext( + async (ctx) => { await func(ctx); return null; }, + name, config, cancellationToken); + } + + private Task RunChildContext( + Func> func, + string? name, + ChildContextConfig? config, + CancellationToken cancellationToken) + { + var serializer = LambdaContext.Serializer + ?? throw new InvalidOperationException( + "No ILambdaSerializer is registered on ILambdaContext.Serializer. " + + "In the class library programming model, register one with " + + "[assembly: LambdaSerializer(typeof(...))]. In an executable / custom " + + "runtime, pass it to LambdaBootstrapBuilder.Create(handler, serializer). " + + "In tests, set TestLambdaContext.Serializer."); + + var operationId = _idGenerator.NextId(); + + // Capture this DurableContext's collaborators; the child shares state, + // termination, batcher, ARN, and Lambda context — but uses a child + // OperationIdGenerator so its operation IDs are deterministically + // namespaced under the parent op ID. + IDurableContext ChildFactory(string parentOpId) => new DurableContext( + _state, _terminationManager, _idGenerator.CreateChild(parentOpId), + _durableExecutionArn, LambdaContext, _batcher); + + var op = new ChildContextOperation( + operationId, name, _idGenerator.ParentId, func, config, serializer, ChildFactory, _state, _terminationManager, _durableExecutionArn, _batcher); return op.ExecuteAsync(cancellationToken); } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs index bb95c8476..4cbb98802 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs @@ -67,3 +67,31 @@ public StepInterruptedException(string message) : base(message) { } /// Creates a wrapping an inner exception. public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { } } + +/// +/// Thrown when a child context's user function fails. Surfaces from +/// RunInChildContextAsync; the underlying error is preserved on the +/// // +/// fields. Use to remap into a +/// domain-specific exception. +/// +public class ChildContextException : DurableExecutionException +{ + /// + /// The child context's , if any. + /// + public string? SubType { get; init; } + /// The fully-qualified type name of the original exception. + public string? ErrorType { get; init; } + /// Optional structured error data attached by the user. + public string? ErrorData { get; init; } + /// Stack trace of the original exception, captured before serialization. + public IReadOnlyList? OriginalStackTrace { get; init; } + + /// Creates an empty . + public ChildContextException() { } + /// Creates a with the given message. + public ChildContextException(string message) : base(message) { } + /// Creates a wrapping an inner exception. + public ChildContextException(string message, Exception innerException) : base(message, innerException) { } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs index fb49d9e01..7cd0aedcb 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs @@ -60,6 +60,47 @@ Task WaitAsync( TimeSpan duration, string? name = null, CancellationToken cancellationToken = default); + + /// + /// Run a user function inside a logical sub-workflow (a "child context"). + /// The child has its own deterministic operation-ID space; its result is + /// checkpointed as a CONTEXT operation so subsequent invocations + /// replay the cached value without re-executing the func. + /// + /// + /// Use child contexts to group related durable operations (e.g. a step plus + /// a wait plus a step) into a single observability/error-handling boundary. + /// On failure, surfaces as ; supply + /// to remap into a + /// domain-specific exception. + /// The child context's return value is serialized to a checkpoint using the + /// registered on + /// . + /// + Task RunInChildContextAsync( + Func> func, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Run a user function inside a logical sub-workflow (a "child context") + /// that returns no value. The child has its own deterministic operation-ID + /// space and is checkpointed as a CONTEXT operation so subsequent + /// invocations skip re-executing the func. + /// + /// + /// Use child contexts to group related durable operations (e.g. a step plus + /// a wait plus a step) into a single observability/error-handling boundary. + /// On failure, surfaces as ; supply + /// to remap into a + /// domain-specific exception. + /// + Task RunInChildContextAsync( + Func func, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default); } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs new file mode 100644 index 000000000..1ed6fce7c --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs @@ -0,0 +1,201 @@ +using System.IO; +using System.Text; +using Amazon.Lambda; +using Amazon.Lambda.Core; +using SdkErrorObject = Amazon.Lambda.Model.ErrorObject; +using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; + +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// Durable child context operation. Runs a user-supplied function inside a +/// nested with its own deterministic operation-ID +/// space, persisting the function's result so subsequent invocations replay +/// the cached value without re-executing. +/// +/// +/// Replay branches — example: await ctx.RunInChildContextAsync(child => ..., name: "phase") +/// +/// Fresh: no prior state → sync-flush CONTEXT START → run user +/// func → on success emit CONTEXT SUCCEED → on failure emit CONTEXT FAIL +/// and throw . +/// SUCCEEDED: return cached deserialized result; user func is +/// NOT re-executed. +/// FAILED: throw with the +/// recorded error; if is +/// set, the mapped exception is thrown instead. +/// STARTED / PENDING: re-run the user func without +/// re-checkpointing START. The child's own operations recover from their +/// own checkpoints, so this is replay propagation; if a wait/callback +/// inside the child is still pending, the user func re-suspends. +/// +/// Unlike , child contexts have no retry strategy: +/// failure is terminal and surfaces immediately via +/// . +/// +internal sealed class ChildContextOperation : DurableOperation +{ + private readonly Func> _func; + private readonly ChildContextConfig? _config; + private readonly ILambdaSerializer _serializer; + private readonly Func _childContextFactory; + + public ChildContextOperation( + string operationId, + string? name, + string? parentId, + Func> func, + ChildContextConfig? config, + ILambdaSerializer serializer, + Func childContextFactory, + ExecutionState state, + TerminationManager termination, + string durableExecutionArn, + CheckpointBatcher? batcher = null) + : base(operationId, name, parentId, state, termination, durableExecutionArn, batcher) + { + _func = func; + _config = config; + _serializer = serializer; + _childContextFactory = childContextFactory; + } + + protected override string OperationType => OperationTypes.Context; + + protected override async Task StartAsync(CancellationToken cancellationToken) + { + // Sync-flush CONTEXT START before user code so the service has a record + // of the parent context if the inner func suspends (e.g. a Wait inside + // the child terminates the workflow before SUCCEED is reached). + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + ParentId = ParentId, + Type = OperationTypes.Context, + Action = OperationAction.START, + SubType = _config?.SubType, + Name = Name + }, cancellationToken); + + return await ExecuteFunc(cancellationToken); + } + + protected override Task ReplayAsync(Operation existing, CancellationToken cancellationToken) + { + switch (existing.Status) + { + case OperationStatuses.Succeeded: + // Side-effecting code runs at most once: replay returns the + // cached result without invoking the user func. + return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result)); + + case OperationStatuses.Failed: + throw MapFailureException(BuildChildContextException(existing)); + + case OperationStatuses.Started: + case OperationStatuses.Pending: + // Re-run the user func: the child's own operations replay from + // their own checkpoints. Do NOT re-checkpoint START — the + // original is still authoritative. If something inside the + // child is still pending (Wait, callback, retry) the user func + // will re-suspend on its own. + return ExecuteFunc(cancellationToken); + + default: + throw new NonDeterministicExecutionException( + $"Child context operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay."); + } + } + + private async Task ExecuteFunc(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + var childContext = _childContextFactory(OperationId); + + T result; + try + { + result = await _func(childContext); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + ParentId = ParentId, + Type = OperationTypes.Context, + Action = OperationAction.FAIL, + SubType = _config?.SubType, + Name = Name, + Error = ToSdkError(ex) + }, cancellationToken); + + throw MapFailureException(new ChildContextException(ex.Message, ex) + { + SubType = _config?.SubType, + ErrorType = ex.GetType().FullName + }); + } + + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + ParentId = ParentId, + Type = OperationTypes.Context, + Action = OperationAction.SUCCEED, + SubType = _config?.SubType, + Name = Name, + Payload = SerializeResult(result) + }, cancellationToken); + + return result; + } + + private Exception MapFailureException(ChildContextException ex) + { + var mapper = _config?.ErrorMapping; + if (mapper == null) return ex; + + var mapped = mapper(ex); + return mapped ?? ex; + } + + private ChildContextException BuildChildContextException(Operation failedOp) + { + var err = failedOp.ContextDetails?.Error; + return new ChildContextException(err?.ErrorMessage ?? "Child context failed") + { + SubType = failedOp.SubType ?? _config?.SubType, + ErrorType = err?.ErrorType, + ErrorData = err?.ErrorData, + OriginalStackTrace = err?.StackTrace + }; + } + + private T DeserializeResult(string? serialized) + { + if (serialized == null) return default!; + var bytes = Encoding.UTF8.GetBytes(serialized); + using var ms = new MemoryStream(bytes); + return _serializer.Deserialize(ms); + } + + private string SerializeResult(T value) + { + using var ms = new MemoryStream(); + _serializer.Serialize(value, ms); + return Encoding.UTF8.GetString(ms.ToArray()); + } + + private static SdkErrorObject ToSdkError(Exception ex) => new() + { + ErrorType = ex.GetType().FullName, + ErrorMessage = ex.Message, + StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList() + }; +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/DurableOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/DurableOperation.cs index 907d6e128..951a6aca8 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/DurableOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/DurableOperation.cs @@ -14,12 +14,14 @@ internal abstract class DurableOperation protected readonly TerminationManager Termination; protected readonly string OperationId; protected readonly string? Name; + protected readonly string? ParentId; protected readonly string DurableExecutionArn; protected readonly CheckpointBatcher? Batcher; protected DurableOperation( string operationId, string? name, + string? parentId, ExecutionState state, TerminationManager termination, string durableExecutionArn, @@ -27,6 +29,7 @@ protected DurableOperation( { OperationId = operationId; Name = name; + ParentId = parentId; State = state; Termination = termination; DurableExecutionArn = durableExecutionArn; diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs index b764e07c2..cb06ddc89 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs @@ -43,6 +43,7 @@ internal sealed class StepOperation : DurableOperation public StepOperation( string operationId, string? name, + string? parentId, Func> func, StepConfig? config, ILambdaSerializer serializer, @@ -51,7 +52,7 @@ public StepOperation( TerminationManager termination, string durableExecutionArn, CheckpointBatcher? batcher = null) - : base(operationId, name, state, termination, durableExecutionArn, batcher) + : base(operationId, name, parentId, state, termination, durableExecutionArn, batcher) { _func = func; _config = config; @@ -182,6 +183,7 @@ private async Task ExecuteFunc(int attemptNumber, CancellationToken cancellat var startUpdate = new SdkOperationUpdate { Id = OperationId, + ParentId = ParentId, Type = OperationTypes.Step, Action = OperationAction.START, SubType = OperationSubTypes.Step, @@ -207,6 +209,7 @@ private async Task ExecuteFunc(int attemptNumber, CancellationToken cancellat await EnqueueAsync(new SdkOperationUpdate { Id = OperationId, + ParentId = ParentId, Type = OperationTypes.Step, Action = OperationAction.SUCCEED, SubType = OperationSubTypes.Step, @@ -254,6 +257,7 @@ private async Task HandleStepFailureAsync(Exception ex, int attemptNumber, Ca await EnqueueAsync(new SdkOperationUpdate { Id = OperationId, + ParentId = ParentId, Type = OperationTypes.Step, Action = OperationAction.RETRY, SubType = OperationSubTypes.Step, @@ -269,6 +273,7 @@ await EnqueueAsync(new SdkOperationUpdate await EnqueueAsync(new SdkOperationUpdate { Id = OperationId, + ParentId = ParentId, Type = OperationTypes.Step, Action = OperationAction.FAIL, SubType = OperationSubTypes.Step, diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs index 2c1325974..b8bb8cb5f 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs @@ -28,12 +28,13 @@ internal sealed class WaitOperation : DurableOperation public WaitOperation( string operationId, string? name, + string? parentId, int waitSeconds, ExecutionState state, TerminationManager termination, string durableExecutionArn, CheckpointBatcher? batcher = null) - : base(operationId, name, state, termination, durableExecutionArn, batcher) + : base(operationId, name, parentId, state, termination, durableExecutionArn, batcher) { _waitSeconds = waitSeconds; } @@ -47,6 +48,7 @@ public WaitOperation( await EnqueueAsync(new SdkOperationUpdate { Id = OperationId, + ParentId = ParentId, Type = OperationTypes.Wait, Action = OperationAction.START, SubType = OperationSubTypes.Wait, diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs index 4a3f9d6a7..e0a0ffbf4 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs @@ -7,6 +7,7 @@ using StepDetails = Amazon.Lambda.DurableExecution.StepDetails; using WaitDetails = Amazon.Lambda.DurableExecution.WaitDetails; using ExecutionDetails = Amazon.Lambda.DurableExecution.ExecutionDetails; +using ContextDetails = Amazon.Lambda.DurableExecution.ContextDetails; namespace Amazon.Lambda.DurableExecution.Services; @@ -116,7 +117,9 @@ private static Operation MapFromSdkOperation(SdkOperation sdkOp) Error = sdkOp.StepDetails.Error != null ? new ErrorObject { ErrorType = sdkOp.StepDetails.Error.ErrorType, - ErrorMessage = sdkOp.StepDetails.Error.ErrorMessage + ErrorMessage = sdkOp.StepDetails.Error.ErrorMessage, + StackTrace = sdkOp.StepDetails.Error.StackTrace, + ErrorData = sdkOp.StepDetails.Error.ErrorData } : null, Attempt = sdkOp.StepDetails.Attempt, NextAttemptTimestamp = sdkOp.StepDetails.NextAttemptTimestamp.HasValue @@ -132,6 +135,17 @@ private static Operation MapFromSdkOperation(SdkOperation sdkOp) ExecutionDetails = sdkOp.ExecutionDetails != null ? new ExecutionDetails { InputPayload = sdkOp.ExecutionDetails.InputPayload + } : null, + ContextDetails = sdkOp.ContextDetails != null ? new ContextDetails + { + Result = sdkOp.ContextDetails.Result, + Error = sdkOp.ContextDetails.Error != null ? new ErrorObject + { + ErrorType = sdkOp.ContextDetails.Error.ErrorType, + ErrorMessage = sdkOp.ContextDetails.Error.ErrorMessage, + StackTrace = sdkOp.ContextDetails.Error.StackTrace, + ErrorData = sdkOp.ContextDetails.Error.ErrorData + } : null } : null }; } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextFailsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextFailsTest.cs new file mode 100644 index 000000000..613011df9 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextFailsTest.cs @@ -0,0 +1,93 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class ChildContextFailsTest +{ + private readonly ITestOutputHelper _output; + public ChildContextFailsTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end RunInChildContextAsync failure path: the user func inside the + /// child throws, the SDK emits a CONTEXT FAIL checkpoint, the child's prior + /// inner step is preserved, and the workflow is marked FAILED with the + /// original exception details surfaced via ContextFailedDetails.Error. + /// + [Fact] + public async Task ChildContext_FailureSurfacesAsContextFailed() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("ChildContextFailsFunction"), + "childctxfail", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "integ-test-fail"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + // Failed workflows return null payload; locate the execution by name. + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(60)); + Assert.Equal("FAILED", status, ignoreCase: true); + + var execution = await deployment.GetExecutionAsync(arn!); + Assert.NotNull(execution.Error); + Assert.Contains("intentional child context failure", execution.Error.ErrorMessage); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.EventType == EventType.ContextStarted) ?? false) + && (h.Events?.Any(e => e.EventType == EventType.ContextFailed) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + var contextStarted = events.SingleOrDefault(e => e.EventType == EventType.ContextStarted && e.Name == "phase"); + Assert.NotNull(contextStarted); + Assert.Equal("OrderProcessing", contextStarted!.SubType); + // The child context op itself is at root — its boundary opens at the parent scope. + Assert.Null(contextStarted.ParentId); + + // The CONTEXT FAIL record carries the original exception details and + // closes the boundary back at the parent scope (root, ParentId=null). + var contextFailed = events.SingleOrDefault(e => e.EventType == EventType.ContextFailed && e.Name == "phase"); + Assert.NotNull(contextFailed); + Assert.Null(contextFailed!.ParentId); + var error = contextFailed.ContextFailedDetails.Error?.Payload; + Assert.NotNull(error); + Assert.Contains("intentional child context failure", error!.ErrorMessage ?? string.Empty); + Assert.Equal(typeof(InvalidOperationException).FullName, error.ErrorType); + // The wire ErrorObject preserves StackTrace from ToSdkError end-to-end — + // the service stores it and returns it on replay (or directly in the + // history event), so user-facing ChildContextException.OriginalStackTrace + // is populated rather than dropped. + Assert.NotNull(error.StackTrace); + Assert.NotEmpty(error.StackTrace); + + // The step that ran before the throw was checkpointed under the child. + var contextOpId = contextStarted.Id; + var innerStep = events.SingleOrDefault( + e => e.StepSucceededDetails != null && e.Name == "prepare" && e.ParentId == contextOpId); + Assert.NotNull(innerStep); + Assert.Equal("\"prepared-integ-test-fail\"", innerStep!.StepSucceededDetails.Result?.Payload); + + // Every inner step/wait event for this workflow is parented under the + // child context — the child is a single observability boundary. + var innerOpEvents = events + .Where(e => e.StepStartedDetails != null + || e.StepSucceededDetails != null + || e.StepFailedDetails != null + || e.WaitStartedDetails != null + || e.WaitSucceededDetails != null) + .ToList(); + Assert.NotEmpty(innerOpEvents); + Assert.All(innerOpEvents, e => Assert.Equal(contextOpId, e.ParentId)); + + // The child never reached SUCCEED; the workflow body past the throw is unreachable. + Assert.DoesNotContain(events, e => e.EventType == EventType.ContextSucceeded); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextRetryFailsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextRetryFailsTest.cs new file mode 100644 index 000000000..688336b02 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextRetryFailsTest.cs @@ -0,0 +1,111 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class ChildContextRetryFailsTest +{ + private readonly ITestOutputHelper _output; + public ChildContextRetryFailsTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end: a step inside a child context retries until exhausted, then + /// the child closes as ContextFailed. Validates the child is a single + /// retry/error boundary — every per-attempt StepStarted/StepFailed (and the + /// terminal ContextFailed's surfaced exception) reflect the same logical + /// failure under the same parent op id. + /// + [Fact] + public async Task ChildContext_RetryExhaustionInsideChild_AllAttemptsParentedUnderChild() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("ChildContextRetryFailsFunction"), + "childctxretry", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "integ-test-retry"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // 3 attempts with 2s + 4s retry delays plus service-driven re-invokes. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("FAILED", status, ignoreCase: true); + + var execution = await deployment.GetExecutionAsync(arn!); + Assert.NotNull(execution.Error); + Assert.Contains("always-fails", execution.Error.ErrorMessage); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.EventType == EventType.ContextStarted) ?? false) + && (h.Events?.Any(e => e.EventType == EventType.ContextFailed) ?? false) + && (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 3, + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + var contextStarted = events.SingleOrDefault(e => e.EventType == EventType.ContextStarted && e.Name == "phase"); + Assert.NotNull(contextStarted); + var contextOpId = contextStarted!.Id; + Assert.NotNull(contextOpId); + + // All 3 step attempts (with their per-attempt StepFailed records) ran + // inside the child boundary. + var stepStarted = events.Where(e => e.EventType == EventType.StepStarted && e.Name == "always_fails").ToList(); + Assert.Equal(3, stepStarted.Count); + Assert.All(stepStarted, e => Assert.Equal(contextOpId, e.ParentId)); + + var stepFailed = events.Where(e => e.StepFailedDetails != null && e.Name == "always_fails").ToList(); + Assert.Equal(3, stepFailed.Count); + Assert.All(stepFailed, e => Assert.Equal(contextOpId, e.ParentId)); + + // The per-attempt failure messages reflect the user's exception. + var failureMessages = stepFailed + .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty) + .ToList(); + Assert.Contains(failureMessages, m => m.Contains("attempt 1")); + Assert.Contains(failureMessages, m => m.Contains("attempt 2")); + Assert.Contains(failureMessages, m => m.Contains("attempt 3")); + + // Each StepFailed event preserves StackTrace through the wire — proves + // StepDetails.Error mapping doesn't drop frames. + Assert.All(stepFailed, e => + { + var stack = e.StepFailedDetails.Error?.Payload?.StackTrace; + Assert.NotNull(stack); + Assert.NotEmpty(stack); + }); + + // The child closes the boundary at the parent scope (root) and surfaces + // the underlying exception type — a single retry/error envelope. + var contextFailed = events.SingleOrDefault(e => e.EventType == EventType.ContextFailed && e.Name == "phase"); + Assert.NotNull(contextFailed); + Assert.Null(contextFailed!.ParentId); + var contextError = contextFailed.ContextFailedDetails.Error?.Payload; + Assert.NotNull(contextError); + Assert.Contains("always-fails", contextError!.ErrorMessage ?? string.Empty); + // StackTrace round-trips end-to-end — the service preserves it from the + // checkpointed FAIL update and returns it on replay/history. + Assert.NotNull(contextError.StackTrace); + Assert.NotEmpty(contextError.StackTrace); + + Assert.DoesNotContain(events, e => e.StepSucceededDetails != null); + Assert.DoesNotContain(events, e => e.EventType == EventType.ContextSucceeded); + + // Service honored retry delays: with 2s + 4s and no jitter, the gap + // between first and last StepStarted should be >= 6s. + var startedTimestamps = stepStarted + .Where(e => e.EventTimestamp.HasValue) + .OrderBy(e => e.EventTimestamp!.Value) + .Select(e => e.EventTimestamp!.Value) + .ToList(); + var totalGap = startedTimestamps[^1] - startedTimestamps[0]; + _output.WriteLine($"Time between first and last attempt: {totalGap.TotalSeconds:F1}s"); + Assert.True(totalGap >= TimeSpan.FromSeconds(6), + $"Service did not honor retry delays inside child: {totalGap.TotalSeconds:F1}s gap (expected >= 6s)"); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextTest.cs new file mode 100644 index 000000000..3c61bc4c0 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ChildContextTest.cs @@ -0,0 +1,109 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class ChildContextTest +{ + private readonly ITestOutputHelper _output; + public ChildContextTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end RunInChildContextAsync: the workflow runs a child context that + /// performs step + wait + step and returns a typed result. The unit tests + /// fake state transitions in-memory; this test verifies the service actually + /// round-trips CONTEXT START/SUCCEED records, parents the inner step/wait + /// events under the context op, and persists the child's return value as + /// the ContextSucceeded payload. + /// + [Fact] + public async Task ChildContext_CompletesViaService() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("ChildContextFunction"), + "childctx", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "integ-test-456"}"""); + Assert.Equal(200, invokeResponse.StatusCode); + + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(60)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.EventType == EventType.ContextStarted) ?? false) + && (h.Events?.Any(e => e.EventType == EventType.ContextSucceeded) ?? false) + && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 + && (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Exactly one child context was opened and closed successfully. + var contextStarted = events.SingleOrDefault(e => e.EventType == EventType.ContextStarted && e.Name == "phase"); + Assert.NotNull(contextStarted); + Assert.Equal("OrderProcessing", contextStarted!.SubType); + + // The child boundary opens and closes at the parent scope (root, ParentId=null). + Assert.Null(contextStarted.ParentId); + + var contextSucceeded = events.SingleOrDefault(e => e.EventType == EventType.ContextSucceeded && e.Name == "phase"); + Assert.NotNull(contextSucceeded); + Assert.Null(contextSucceeded!.ParentId); + + // The child's return value was checkpointed as the CONTEXT SUCCEED payload. + Assert.Equal( + "\"processed-validated-integ-test-456\"", + contextSucceeded.ContextSucceededDetails.Result?.Payload); + + // Inner operations are parented to the context op so the service + // visualizes them nested under the child. + var contextOpId = contextStarted.Id; + Assert.NotNull(contextOpId); + + var innerStepEvents = events + .Where(e => e.EventType == EventType.StepStarted && e.ParentId == contextOpId) + .OrderBy(e => e.EventTimestamp) + .ToList(); + Assert.Equal(2, innerStepEvents.Count); + Assert.Equal("validate", innerStepEvents[0].Name); + Assert.Equal("process", innerStepEvents[1].Name); + + var innerWaitStarted = events.SingleOrDefault( + e => e.WaitStartedDetails != null && e.Name == "short_wait" && e.ParentId == contextOpId); + Assert.NotNull(innerWaitStarted); + Assert.Equal(2, innerWaitStarted!.WaitStartedDetails.Duration); + + // Inner step results chain: validate -> wait -> process. + var stepResults = events + .Where(e => e.StepSucceededDetails != null && e.ParentId == contextOpId) + .OrderBy(e => e.EventTimestamp) + .Select(e => (Name: e.Name, Payload: e.StepSucceededDetails.Result?.Payload?.Trim('"'))) + .ToList(); + Assert.Equal(2, stepResults.Count); + Assert.Equal("validate", stepResults[0].Name); + Assert.Equal("validated-integ-test-456", stepResults[0].Payload); + Assert.Equal("process", stepResults[1].Name); + Assert.Equal("processed-validated-integ-test-456", stepResults[1].Payload); + + // Every inner step/wait event for this workflow is parented under the + // child context — the child is a single observability boundary. + var innerOpEvents = events + .Where(e => e.StepStartedDetails != null + || e.StepSucceededDetails != null + || e.StepFailedDetails != null + || e.WaitStartedDetails != null + || e.WaitSucceededDetails != null) + .ToList(); + Assert.NotEmpty(innerOpEvents); + Assert.All(innerOpEvents, e => Assert.Equal(contextOpId, e.ParentId)); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/ChildContextFailsFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/ChildContextFailsFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/ChildContextFailsFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Function.cs new file mode 100644 index 000000000..9a08842ea --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Function.cs @@ -0,0 +1,45 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Throw inside a child context to validate the CONTEXT FAIL path: the + // service must record a ContextFailed event with the error details and + // mark the workflow FAILED. + await context.RunInChildContextAsync( + async (childCtx) => + { + await childCtx.StepAsync( + async (_) => { await Task.CompletedTask; return $"prepared-{input.OrderId}"; }, + name: "prepare"); + + throw new InvalidOperationException("intentional child context failure for integration test"); + }, + name: "phase", + config: new ChildContextConfig { SubType = "OrderProcessing" }); + + return new TestResult { Status = "should_not_reach" }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/ChildContextFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/ChildContextFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/ChildContextFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Function.cs new file mode 100644 index 000000000..624b55a1a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Function.cs @@ -0,0 +1,51 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Run a child context that itself does step + wait + step. The child's + // return value is checkpointed at the parent level as a CONTEXT + // SUCCEED record, so on replay we'd see it returned from cache. + var phaseResult = await context.RunInChildContextAsync( + async (childCtx) => + { + var validated = await childCtx.StepAsync( + async (_) => { await Task.CompletedTask; return $"validated-{input.OrderId}"; }, + name: "validate"); + + await childCtx.WaitAsync(TimeSpan.FromSeconds(2), name: "short_wait"); + + var processed = await childCtx.StepAsync( + async (_) => { await Task.CompletedTask; return $"processed-{validated}"; }, + name: "process"); + + return processed; + }, + name: "phase", + config: new ChildContextConfig { SubType = "OrderProcessing" }); + + return new TestResult { Status = "completed", Data = phaseResult }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/ChildContextRetryFailsFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/ChildContextRetryFailsFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/ChildContextRetryFailsFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Function.cs new file mode 100644 index 000000000..305311c2f --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Function.cs @@ -0,0 +1,58 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // A retry-then-exhaust step inside a child context: every retry + // checkpoint should be parented under the child, and the child should + // close as ContextFailed when retries are exhausted — proving the + // child is a single retry/error boundary. + await context.RunInChildContextAsync( + async (childCtx) => + { + return await childCtx.StepAsync( + async (ctx) => + { + await Task.CompletedTask; + throw new InvalidOperationException( + $"always-fails on attempt {ctx.AttemptNumber} for {input.OrderId}"); + }, + name: "always_fails", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromSeconds(2), + maxDelay: TimeSpan.FromSeconds(10), + backoffRate: 2.0, + jitter: JitterStrategy.None) + }); + }, + name: "phase", + config: new ChildContextConfig { SubType = "OrderProcessing" }); + + return new TestResult { Status = "should_not_reach" }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs new file mode 100644 index 000000000..539bfff0e --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs @@ -0,0 +1,473 @@ +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.DurableExecution.Internal; +using Amazon.Lambda.Serialization.SystemTextJson; +using Amazon.Lambda.TestUtilities; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +public class ChildContextOperationTests +{ + /// Reproduces the Id that emits for the n-th root-level operation. + private static string IdAt(int position) => OperationIdGenerator.HashOperationId(position.ToString()); + + /// The hashed ID of the n-th child operation under . + private static string ChildIdAt(string parentOpId, int position) => + OperationIdGenerator.HashOperationId($"{parentOpId}-{position}"); + + private static (DurableContext context, RecordingBatcher recorder, TerminationManager tm, ExecutionState state) + CreateContext(InitialExecutionState? initialState = null) + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(initialState); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); +#pragma warning disable AWSLAMBDA001 // TestLambdaContext.Serializer is experimental. + var lambdaContext = new TestLambdaContext { Serializer = new DefaultLambdaJsonSerializer() }; +#pragma warning restore AWSLAMBDA001 + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + return (context, recorder, tm, state); + } + + [Fact] + public async Task RunInChildContextAsync_FreshExecution_RunsFuncAndCheckpoints() + { + var (context, recorder, tm, _) = CreateContext(); + + var executed = false; + var result = await context.RunInChildContextAsync( + async (childCtx) => + { + executed = true; + return await childCtx.StepAsync(async (_) => { await Task.CompletedTask; return "inner"; }, name: "inner_step"); + }, + name: "phase"); + + Assert.True(executed); + Assert.Equal("inner", result); + Assert.False(tm.IsTerminated); + + // CONTEXT START → STEP START (fire-and-forget, but flushed before drain) + // → STEP SUCCEED → CONTEXT SUCCEED + await recorder.Batcher.DrainAsync(); + + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Equal(new[] + { + "CONTEXT:START", + "STEP:START", + "STEP:SUCCEED", + "CONTEXT:SUCCEED" + }, actions); + + var contextSucceed = recorder.Flushed.Single(o => o.Type == "CONTEXT" && o.Action == "SUCCEED"); + Assert.Equal(IdAt(1), contextSucceed.Id); + Assert.Equal("phase", contextSucceed.Name); + Assert.Equal("\"inner\"", contextSucceed.Payload); + } + + [Fact] + public async Task RunInChildContextAsync_FreshExecution_ChildOperationIdsDeterministic() + { + var (context, recorder, _, _) = CreateContext(); + + await context.RunInChildContextAsync( + async (childCtx) => + { + await childCtx.StepAsync(async (_) => { await Task.CompletedTask; return "a"; }, name: "first"); + await childCtx.StepAsync(async (_) => { await Task.CompletedTask; return "b"; }, name: "second"); + return 0; + }, + name: "phase"); + + await recorder.Batcher.DrainAsync(); + + var parentOpId = IdAt(1); + var firstChildOpId = ChildIdAt(parentOpId, 1); + var secondChildOpId = ChildIdAt(parentOpId, 2); + + var stepStarts = recorder.Flushed.Where(o => o.Type == "STEP" && o.Action == "START").ToArray(); + Assert.Equal(2, stepStarts.Length); + Assert.Equal(firstChildOpId, stepStarts[0].Id); + Assert.Equal(secondChildOpId, stepStarts[1].Id); + } + + [Fact] + public async Task RunInChildContextAsync_ReplaySucceeded_ReturnsCachedAndDoesNotRun() + { + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + Name = "phase", + ContextDetails = new ContextDetails { Result = "\"cached\"" } + } + } + }); + + var executed = false; + var result = await context.RunInChildContextAsync( + async (childCtx) => + { + executed = true; + await Task.CompletedTask; + return "fresh"; + }, + name: "phase"); + + Assert.False(executed); + Assert.Equal("cached", result); + + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayFailed_ThrowsChildContextException() + { + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Failed, + Name = "phase", + SubType = "WaitForCallback", + ContextDetails = new ContextDetails + { + Error = new ErrorObject + { + ErrorType = "System.InvalidOperationException", + ErrorMessage = "child went wrong", + ErrorData = "{\"detail\":\"x\"}", + StackTrace = new[] { "at A.B()", "at C.D()" } + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "should not run"; }, + name: "phase")); + + Assert.Equal("child went wrong", ex.Message); + Assert.Equal("System.InvalidOperationException", ex.ErrorType); + Assert.Equal("{\"detail\":\"x\"}", ex.ErrorData); + Assert.Equal("WaitForCallback", ex.SubType); + Assert.NotNull(ex.OriginalStackTrace); + Assert.Equal(2, ex.OriginalStackTrace!.Count); + + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayFailed_AppliesErrorMapping() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Failed, + Name = "phase", + ContextDetails = new ContextDetails + { + Error = new ErrorObject + { + ErrorType = "System.InvalidOperationException", + ErrorMessage = "boom" + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "x"; }, + name: "phase", + config: new ChildContextConfig + { + // Mapper sees the ChildContextException and remaps to a + // domain-specific exception, preserving the original via + // InnerException. + ErrorMapping = e => new InvalidOperationException("mapped", e) + })); + + Assert.Equal("mapped", ex.Message); + Assert.IsType(ex.InnerException); + } + + [Fact] + public async Task RunInChildContextAsync_FuncThrows_CheckpointsFailAndThrows() + { + var (context, recorder, _, _) = CreateContext(); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; throw new InvalidOperationException("inner boom"); }, + name: "phase")); + + Assert.Equal("inner boom", ex.Message); + Assert.Equal("System.InvalidOperationException", ex.ErrorType); + + await recorder.Batcher.DrainAsync(); + var contextActions = recorder.Flushed + .Where(o => o.Type == "CONTEXT") + .Select(o => o.Action.ToString()) + .ToArray(); + Assert.Equal(new[] { "START", "FAIL" }, contextActions); + } + + [Fact] + public async Task RunInChildContextAsync_FuncThrows_AppliesErrorMapping() + { + var (context, _, _, _) = CreateContext(); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; throw new TimeoutException("inner timeout"); }, + name: "phase", + config: new ChildContextConfig + { + ErrorMapping = e => new InvalidOperationException("mapped", e) + })); + + Assert.Equal("mapped", ex.Message); + Assert.IsType(ex.InnerException); + } + + [Fact] + public async Task RunInChildContextAsync_ChildSuspendsOnWait_TerminatesWithWaitScheduled() + { + var (context, recorder, tm, _) = CreateContext(); + + // Suspending child: the inner Wait flushes WAIT START sync, then + // returns a never-completing Task via TerminationManager.SuspendAndAwait. + // The outer ChildContextOperation awaits that and never reaches + // CONTEXT SUCCEED. DurableExecutionHandler.RunAsync's WhenAny race + // wins on the termination signal; the test below short-circuits via + // the same TerminationManager.IsTerminated check. + var task = context.RunInChildContextAsync( + async (childCtx) => + { + await childCtx.WaitAsync(TimeSpan.FromSeconds(5), name: "wait_inside"); + return "should not return"; + }, + name: "phase"); + + await Task.Delay(50); + + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + // CONTEXT START + WAIT START have flushed; no SUCCEED/FAIL since the + // child is suspended. + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Contains("CONTEXT:START", actions); + Assert.Contains("WAIT:START", actions); + Assert.DoesNotContain("CONTEXT:SUCCEED", actions); + Assert.DoesNotContain("CONTEXT:FAIL", actions); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayStarted_ReExecutesFuncWithInnerCacheReplay() + { + var parentOpId = IdAt(1); + var innerStepOpId = ChildIdAt(parentOpId, 1); + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentOpId, + Type = OperationTypes.Context, + Status = OperationStatuses.Started, + Name = "phase" + }, + new() + { + Id = innerStepOpId, + Type = OperationTypes.Step, + Status = OperationStatuses.Succeeded, + Name = "inner_step", + StepDetails = new StepDetails { Result = "\"cached_inner\"" } + } + } + }); + + var innerExecuted = false; + var result = await context.RunInChildContextAsync( + async (childCtx) => + { + return await childCtx.StepAsync( + async (_) => { innerExecuted = true; await Task.CompletedTask; return "fresh_inner"; }, + name: "inner_step"); + }, + name: "phase"); + + // The user func re-runs (replay propagation), but its inner step + // replays the cached value without invoking the inner code. + Assert.False(innerExecuted); + Assert.Equal("cached_inner", result); + + await recorder.Batcher.DrainAsync(); + + // Critical: do NOT re-checkpoint CONTEXT START on replay. The original + // STARTED checkpoint is still authoritative. + Assert.DoesNotContain(recorder.Flushed, o => o.Type == "CONTEXT" && o.Action == "START"); + + // The CONTEXT SUCCEED happens only this time, since the user func + // returned successfully. + Assert.Contains(recorder.Flushed, o => o.Type == "CONTEXT" && o.Action == "SUCCEED"); + } + + [Fact] + public async Task RunInChildContextAsync_VoidOverload_RunsAndCheckpoints() + { + var (context, recorder, _, _) = CreateContext(); + + var executed = false; + await context.RunInChildContextAsync( + async (childCtx) => + { + await childCtx.StepAsync( + async (_) => { executed = true; await Task.CompletedTask; }, + name: "inner_void"); + }, + name: "phase"); + + Assert.True(executed); + + await recorder.Batcher.DrainAsync(); + + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Equal(new[] + { + "CONTEXT:START", + "STEP:START", + "STEP:SUCCEED", + "CONTEXT:SUCCEED" + }, actions); + + // Void overload uses NullCheckpointSerializer → "null" payload. + var contextSucceed = recorder.Flushed.Single(o => o.Type == "CONTEXT" && o.Action == "SUCCEED"); + Assert.Equal("null", contextSucceed.Payload); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayTypeMismatch_ThrowsNonDeterministicException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, // wrong type — should be CONTEXT + Status = OperationStatuses.Succeeded, + Name = "phase", + StepDetails = new StepDetails { Result = "\"x\"" } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "x"; }, + name: "phase")); + + Assert.Contains("expected type 'CONTEXT'", ex.Message); + Assert.Contains("found 'STEP'", ex.Message); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayNameMismatch_ThrowsNonDeterministicException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + Name = "old_name", + ContextDetails = new ContextDetails { Result = "\"x\"" } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "x"; }, + name: "new_name")); + + Assert.Contains("expected name 'new_name'", ex.Message); + Assert.Contains("found 'old_name'", ex.Message); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayUnknownStatus_ThrowsNonDeterministicException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = "BOGUS", + Name = "phase" + } + } + }); + + await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "x"; }, + name: "phase")); + } + + [Fact] + public async Task RunInChildContextAsync_SubTypeAndName_PropagateToCheckpoint() + { + var (context, recorder, _, _) = CreateContext(); + + await context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "ok"; }, + name: "phase", + config: new ChildContextConfig { SubType = "WaitForCallback" }); + + await recorder.Batcher.DrainAsync(); + + var contextOps = recorder.Flushed.Where(o => o.Type == "CONTEXT").ToArray(); + Assert.Equal(2, contextOps.Length); + foreach (var op in contextOps) + { + Assert.Equal("WaitForCallback", op.SubType); + Assert.Equal("phase", op.Name); + } + } + +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs index 50e511cb1..c694573f5 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs @@ -95,7 +95,9 @@ public async Task StepAsync_ReplayFailed_ThrowsStepException() Error = new ErrorObject { ErrorType = "System.TimeoutException", - ErrorMessage = "timed out" + ErrorMessage = "timed out", + ErrorData = "{\"detail\":\"x\"}", + StackTrace = new[] { "at A.B()", "at C.D()" } } } } @@ -107,6 +109,9 @@ public async Task StepAsync_ReplayFailed_ThrowsStepException() Assert.Equal("System.TimeoutException", ex.ErrorType); Assert.Equal("timed out", ex.Message); + Assert.Equal("{\"detail\":\"x\"}", ex.ErrorData); + Assert.NotNull(ex.OriginalStackTrace); + Assert.Equal(2, ex.OriginalStackTrace!.Count); } [Fact] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs index 2326f8544..0a85f8b24 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs @@ -177,6 +177,109 @@ await client.CheckpointAsync( Assert.Equal("WAIT", call.Updates[1].Type); } + [Fact] + public async Task GetExecutionStateAsync_CopiesContextDetailsResultAndError() + { + var mockClient = new MockLambdaClient + { + GetExecutionStateHandler = _ => new GetDurableExecutionStateResponse + { + Operations = new List + { + new Amazon.Lambda.Model.Operation + { + Id = "ctx-1", + Type = "CONTEXT", + Status = "SUCCEEDED", + Name = "phase", + ContextDetails = new Amazon.Lambda.Model.ContextDetails + { + Result = "\"ok\"" + } + }, + new Amazon.Lambda.Model.Operation + { + Id = "ctx-2", + Type = "CONTEXT", + Status = "FAILED", + Name = "phase2", + ContextDetails = new Amazon.Lambda.Model.ContextDetails + { + Error = new SdkErrorObject + { + ErrorType = "System.InvalidOperationException", + ErrorMessage = "boom", + ErrorData = "{\"detail\":\"x\"}", + StackTrace = new List { "at A.B()", "at C.D()" } + } + } + } + } + } + }; + var client = new LambdaDurableServiceClient(mockClient); + + var (operations, _) = await client.GetExecutionStateAsync("arn", "tok", "marker"); + + Assert.Equal(2, operations.Count); + + Assert.NotNull(operations[0].ContextDetails); + Assert.Equal("\"ok\"", operations[0].ContextDetails!.Result); + Assert.Null(operations[0].ContextDetails!.Error); + + Assert.NotNull(operations[1].ContextDetails); + Assert.NotNull(operations[1].ContextDetails!.Error); + Assert.Equal("System.InvalidOperationException", operations[1].ContextDetails!.Error!.ErrorType); + Assert.Equal("boom", operations[1].ContextDetails!.Error!.ErrorMessage); + Assert.Equal("{\"detail\":\"x\"}", operations[1].ContextDetails!.Error!.ErrorData); + Assert.Equal(new[] { "at A.B()", "at C.D()" }, operations[1].ContextDetails!.Error!.StackTrace); + } + + [Fact] + public async Task GetExecutionStateAsync_CopiesStepDetailsErrorStackTraceAndErrorData() + { + // Round-trip safety: the SDK returns ErrorObject with all four fields, + // and Internal.Operation must preserve them so StepException can surface + // OriginalStackTrace / ErrorData on replay. + var mockClient = new MockLambdaClient + { + GetExecutionStateHandler = _ => new GetDurableExecutionStateResponse + { + Operations = new List + { + new Amazon.Lambda.Model.Operation + { + Id = "step-1", + Type = "STEP", + Status = "FAILED", + Name = "charge", + StepDetails = new Amazon.Lambda.Model.StepDetails + { + Error = new SdkErrorObject + { + ErrorType = "System.TimeoutException", + ErrorMessage = "timed out", + ErrorData = "{\"detail\":\"y\"}", + StackTrace = new List { "at E.F()", "at G.H()" } + } + } + } + } + } + }; + var client = new LambdaDurableServiceClient(mockClient); + + var (operations, _) = await client.GetExecutionStateAsync("arn", "tok", "marker"); + + var op = Assert.Single(operations); + Assert.NotNull(op.StepDetails); + Assert.NotNull(op.StepDetails!.Error); + Assert.Equal("System.TimeoutException", op.StepDetails!.Error!.ErrorType); + Assert.Equal("timed out", op.StepDetails!.Error!.ErrorMessage); + Assert.Equal("{\"detail\":\"y\"}", op.StepDetails!.Error!.ErrorData); + Assert.Equal(new[] { "at E.F()", "at G.H()" }, op.StepDetails!.Error!.StackTrace); + } + [Fact] public async Task CheckpointAsync_ReturnsNewToken() { From e34c6c846b0c47cf4ba4574be66506bb5359fb3e Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Fri, 22 May 2026 12:11:22 -0400 Subject: [PATCH 2/2] phil comment --- .../Internal/ChildContextOperation.cs | 20 +++++++- .../ChildContextOperationTests.cs | 51 ++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs index 1ed6fce7c..507cafc0f 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs @@ -122,6 +122,23 @@ private async Task ExecuteFunc(CancellationToken cancellationToken) { throw; } + catch (NonDeterministicExecutionException) + { + // Replay-mismatch from an inner operation means the entire execution + // is corrupt — checkpointing this as CONTEXT FAIL would freeze the + // mismatch into history and prevent future invocations from + // re-detecting it. Bubble up untouched. + throw; + } + catch (StepInterruptedException) + { + // AtMostOncePerRetry crash recovery: a step inside the child saw a + // STARTED checkpoint with no terminal record and routed through its + // retry strategy. The step has already checkpointed its own outcome; + // wrapping this as CONTEXT FAIL would mask that. Bubble up so the + // step's strategy / replay flow stays authoritative. + throw; + } catch (Exception ex) { await EnqueueAsync(new SdkOperationUpdate @@ -138,7 +155,8 @@ await EnqueueAsync(new SdkOperationUpdate throw MapFailureException(new ChildContextException(ex.Message, ex) { SubType = _config?.SubType, - ErrorType = ex.GetType().FullName + ErrorType = ex.GetType().FullName, + OriginalStackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList() }); } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs index 539bfff0e..39e9464be 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs @@ -225,6 +225,10 @@ public async Task RunInChildContextAsync_FuncThrows_CheckpointsFailAndThrows() Assert.Equal("inner boom", ex.Message); Assert.Equal("System.InvalidOperationException", ex.ErrorType); + // Fresh-path failures populate OriginalStackTrace alongside ErrorType so + // ErrorMapping callbacks see the same shape on both fresh and replay paths. + Assert.NotNull(ex.OriginalStackTrace); + Assert.NotEmpty(ex.OriginalStackTrace!); await recorder.Batcher.DrainAsync(); var contextActions = recorder.Flushed @@ -234,6 +238,50 @@ public async Task RunInChildContextAsync_FuncThrows_CheckpointsFailAndThrows() Assert.Equal(new[] { "START", "FAIL" }, contextActions); } + [Fact] + public async Task RunInChildContextAsync_InnerNonDeterminism_BubblesUpWithoutCheckpointingFail() + { + // A child context whose inner step's checkpoint type doesn't match the + // user code (replay mismatch) must NOT be wrapped/checkpointed as + // CONTEXT FAIL — that would freeze the corruption into history. + var parentOpId = IdAt(1); + var innerOpId = ChildIdAt(parentOpId, 1); + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentOpId, + Type = OperationTypes.Context, + Status = OperationStatuses.Started, + Name = "phase" + }, + new() + { + Id = innerOpId, + Type = OperationTypes.Wait, // wrong type — code calls StepAsync + Status = OperationStatuses.Succeeded, + Name = "inner_step" + } + } + }); + + await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (childCtx) => + { + return await childCtx.StepAsync( + async (_) => { await Task.CompletedTask; return "x"; }, + name: "inner_step"); + }, + name: "phase")); + + await recorder.Batcher.DrainAsync(); + Assert.DoesNotContain(recorder.Flushed, o => o.Type == "CONTEXT" && o.Action == "FAIL"); + } + [Fact] public async Task RunInChildContextAsync_FuncThrows_AppliesErrorMapping() { @@ -367,7 +415,8 @@ await childCtx.StepAsync( "CONTEXT:SUCCEED" }, actions); - // Void overload uses NullCheckpointSerializer → "null" payload. + // Void overload returns a null object, which the registered + // ILambdaSerializer serializes as the literal "null" payload. var contextSucceed = recorder.Flushed.Single(o => o.Type == "CONTEXT" && o.Action == "SUCCEED"); Assert.Equal("null", contextSucceed.Payload); }