Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ global.json

**/cdk.out/**
**/.DS_Store

# JetBrains Rider per-project cache
**/*.lscache
344 changes: 277 additions & 67 deletions Docs/durable-execution-design.md

Large diffs are not rendered by default.

62 changes: 61 additions & 1 deletion Libraries/Libraries.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 18
VisualStudioVersion = 18.5.11709.299 stable
VisualStudioVersion = 18.5.11709.299
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{AAB54E74-20B1-42ED-BC3D-CE9F7BC7FD12}"
EndProject
Expand Down Expand Up @@ -155,6 +155,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ResponseStreamingFunctionHa
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AspNetCoreStreamingApiGatewayTest", "test\Amazon.Lambda.RuntimeSupport.Tests\AspNetCoreStreamingApiGatewayTest\AspNetCoreStreamingApiGatewayTest.csproj", "{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution", "src\Amazon.Lambda.DurableExecution\Amazon.Lambda.DurableExecution.csproj", "{9097B5A4-E100-47FD-A676-0B666A36FAFF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution.Tests", "test\Amazon.Lambda.DurableExecution.Tests\Amazon.Lambda.DurableExecution.Tests.csproj", "{57150BA6-3826-431F-8F58-B1D11FAFC5D4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution.IntegrationTests", "test\Amazon.Lambda.DurableExecution.IntegrationTests\Amazon.Lambda.DurableExecution.IntegrationTests.csproj", "{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution.AotPublishTest", "test\Amazon.Lambda.DurableExecution.AotPublishTest\Amazon.Lambda.DurableExecution.AotPublishTest.csproj", "{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -969,6 +977,54 @@ Global
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}.Release|x64.Build.0 = Release|Any CPU
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}.Release|x86.ActiveCfg = Release|Any CPU
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}.Release|x86.Build.0 = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x64.ActiveCfg = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x64.Build.0 = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x86.ActiveCfg = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x86.Build.0 = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|Any CPU.Build.0 = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x64.ActiveCfg = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x64.Build.0 = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x86.ActiveCfg = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x86.Build.0 = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x64.ActiveCfg = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x64.Build.0 = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x86.ActiveCfg = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x86.Build.0 = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|Any CPU.Build.0 = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x64.ActiveCfg = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x64.Build.0 = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x86.ActiveCfg = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x86.Build.0 = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x64.ActiveCfg = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x64.Build.0 = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x86.ActiveCfg = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x86.Build.0 = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|Any CPU.Build.0 = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x64.ActiveCfg = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x64.Build.0 = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x86.ActiveCfg = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x86.Build.0 = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x64.ActiveCfg = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x64.Build.0 = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x86.ActiveCfg = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x86.Build.0 = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|Any CPU.Build.0 = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x64.ActiveCfg = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x64.Build.0 = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x86.ActiveCfg = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1045,6 +1101,10 @@ Global
{80594C21-C6EB-469E-83CC-68F9F661CA5E} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9} = {B5BD0336-7D08-492C-8489-42C987E29B39}
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93} = {B5BD0336-7D08-492C-8489-42C987E29B39}
{9097B5A4-E100-47FD-A676-0B666A36FAFF} = {AAB54E74-20B1-42ED-BC3D-CE9F7BC7FD12}
{57150BA6-3826-431F-8F58-B1D11FAFC5D4} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {503678A4-B8D1-4486-8915-405A3E9CF0EB}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors>IL2026,IL2067,IL2075,IL3050</WarningsAsErrors>
<!-- DurableExecution intentionally consumes the preview ILambdaContext.Serializer
API. The whole package is in development (0.x), so suppressing project-wide
is appropriate; downstream users still see AWSLAMBDA001 in their own code. -->
<NoWarn>$(NoWarn);AWSLAMBDA001</NoWarn>
</PropertyGroup>

<ItemGroup>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for a child context.
/// </summary>
/// <remarks>
/// A child context is a logical sub-workflow with its own deterministic
/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
/// (and overloads) to run code inside one.
/// </remarks>
public sealed class ChildContextConfig
{
/// <summary>
/// Operation sub-type label for observability (e.g. <c>"WaitForCallback"</c>).
/// Surfaces on the wire <c>OperationUpdate.SubType</c> field.
/// </summary>
public string? SubType { get; set; }

/// <summary>
/// Optional function to transform exceptions thrown by the child context's
/// user function before they surface to the caller. Useful for wrapping
/// low-level errors into domain-specific exceptions.
/// </summary>
/// <remarks>
/// Applied when the user function throws (the mapped exception propagates
/// to the caller of <c>RunInChildContextAsync</c>) and on replay of a
/// <c>FAILED</c> child context (the constructed
/// <see cref="ChildContextException"/> is mapped before being thrown).
/// </remarks>
public Func<Exception, Exception>? ErrorMapping { get; set; }
}
219 changes: 219 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
using Amazon.Lambda.Core;
using Amazon.Lambda.DurableExecution.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Implementation of <see cref="IDurableContext"/>. Constructs and dispatches
/// per-operation classes (<see cref="StepOperation{T}"/>, <see cref="WaitOperation"/>);
/// the replay logic lives in those classes.
/// </summary>
internal sealed class DurableContext : IDurableContext
{
private readonly ExecutionState _state;
private readonly TerminationManager _terminationManager;
private readonly OperationIdGenerator _idGenerator;
private readonly string _durableExecutionArn;
private readonly CheckpointBatcher? _batcher;

public DurableContext(
ExecutionState state,
TerminationManager terminationManager,
OperationIdGenerator idGenerator,
string durableExecutionArn,
ILambdaContext lambdaContext,
CheckpointBatcher? batcher = null)
{
_state = state;
_terminationManager = terminationManager;
_idGenerator = idGenerator;
_durableExecutionArn = durableExecutionArn;
_batcher = batcher;
LambdaContext = lambdaContext;
}

// Replay-safe logger ships in a follow-up PR; see IDurableContext.Logger doc.
public ILogger Logger => NullLogger.Instance;
public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn);
public ILambdaContext LambdaContext { get; }

public Task<T> StepAsync<T>(
Func<IStepContext, Task<T>> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
=> RunStep(func, name, config, cancellationToken);

public async Task StepAsync(
Func<IStepContext, Task> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
{
// Void steps don't carry a meaningful payload — wrap with an object?-typed
// step that always returns null. The serializer isn't actually invoked
// with a non-null value, so any registered ILambdaSerializer suffices.
await RunStep<object?>(
async (ctx) => { await func(ctx); return null; },
name, config, cancellationToken);
}

private Task<T> RunStep<T>(
Func<IStepContext, Task<T>> func,
string? name,
StepConfig? config,
CancellationToken cancellationToken)
{
var serializer = LambdaContext.Serializer
?? throw new InvalidOperationException(
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
"Register a serializer via LambdaBootstrapBuilder.Create(handler, serializer) " +
"(or in tests, set TestLambdaContext.Serializer).");

var operationId = _idGenerator.NextId();
var op = new StepOperation<T>(
operationId, name, func, config, serializer, Logger,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

public Task WaitAsync(
TimeSpan duration,
string? name = null,
CancellationToken cancellationToken = default)
{
// Service timer granularity is 1 second; sub-second waits would round to 0.
// WaitOptions.WaitSeconds is integer in [1, 31_622_400] (1 second to ~1 year).
if (duration < TimeSpan.FromSeconds(1))
throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at least 1 second.");

if (duration > TimeSpan.FromSeconds(31_622_400))
throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at most 31,622,400 seconds (~1 year).");

cancellationToken.ThrowIfCancellationRequested();

var operationId = _idGenerator.NextId();
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
var op = new WaitOperation(
operationId, name, waitSeconds,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

public Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
=> RunChildContext(func, name, config, cancellationToken);

public async Task RunInChildContextAsync(
Func<IDurableContext, Task> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
{
// Void child contexts don't carry a meaningful payload; the wrapper
// returns null so the registered ILambdaSerializer is never asked to
// serialize a real value.
await RunChildContext<object?>(
async (ctx) => { await func(ctx); return null; },
name, config, cancellationToken);
}

private Task<T> RunChildContext<T>(
Func<IDurableContext, Task<T>> func,
string? name,
ChildContextConfig? config,
CancellationToken cancellationToken)
{
var serializer = LambdaContext.Serializer
?? throw new InvalidOperationException(
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
"Register a serializer via LambdaBootstrapBuilder.Create(handler, serializer) " +
"(or in tests, set TestLambdaContext.Serializer).");

var operationId = _idGenerator.NextId();

// Capture this DurableContext's collaborators; the child shares state,
// termination, batcher, ARN, and Lambda context — but uses a child
// OperationIdGenerator so its operation IDs are deterministically
// namespaced under the parent op ID.
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);

var op = new ChildContextOperation<T>(
operationId, name, func, config, serializer, ChildFactory,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

public Task<TResult> InvokeAsync<TPayload, TResult>(
string functionName,
TPayload payload,
string? name = null,
InvokeConfig? config = null,
CancellationToken cancellationToken = default)
=> RunInvoke<TPayload, TResult>(
functionName, payload,
name, config, cancellationToken);

private Task<TResult> RunInvoke<TPayload, TResult>(
string functionName,
TPayload payload,
string? name,
InvokeConfig? config,
CancellationToken cancellationToken)
{
// Argument validation runs synchronously at the call site (matches the
// .NET convention of failing fast for misuse). Match Python/JS/Java
// parity: only check for null/empty here; the durable execution service
// enforces the qualified-ARN rule and surfaces a precise error when an
// unqualified identifier is used.
ArgumentNullException.ThrowIfNull(functionName);
if (string.IsNullOrWhiteSpace(functionName))
throw new ArgumentException("Function name must not be empty or whitespace.", nameof(functionName));

var serializer = LambdaContext.Serializer
?? throw new InvalidOperationException(
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
"Register a serializer via LambdaBootstrapBuilder.Create(handler, serializer) " +
"(or in tests, set TestLambdaContext.Serializer).");

cancellationToken.ThrowIfCancellationRequested();

var operationId = _idGenerator.NextId();
var op = new InvokeOperation<TPayload, TResult>(
operationId, name, functionName, payload, config,
serializer,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
}

internal sealed class DurableExecutionContext : IExecutionContext
{
public DurableExecutionContext(string durableExecutionArn)
{
DurableExecutionArn = durableExecutionArn;
}

public string DurableExecutionArn { get; }
}

internal sealed class StepContext : IStepContext
{
public StepContext(string operationId, int attemptNumber, ILogger logger)
{
OperationId = operationId;
AttemptNumber = attemptNumber;
Logger = logger;
}

public ILogger Logger { get; }
public int AttemptNumber { get; }
public string OperationId { get; }
}
Loading
Loading