Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 72 additions & 68 deletions Docs/durable-execution-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -1197,10 +1197,9 @@ public class StepConfig
public StepSemantics Semantics { get; set; } = StepSemantics.AtLeastOncePerRetry;

// Note: there is no Serializer property here. Step result serialization
// is delegated to the ILambdaSerializer registered on ILambdaContext.Serializer
// (assembly attribute or LambdaBootstrapBuilder.Create). To swap the
// step-checkpoint format for a single step, the planned route is the
// StepAsync(..., ICheckpointSerializer<T>, ...) overload (post-v1).
// is delegated to the ILambdaSerializer registered on
// ILambdaContext.Serializer (assembly attribute or
// LambdaBootstrapBuilder.Create).
}

public enum StepSemantics
Expand Down Expand Up @@ -1231,10 +1230,9 @@ public class CallbackConfig
/// </summary>
public TimeSpan HeartbeatTimeout { get; set; } = TimeSpan.Zero;

/// <summary>
/// Custom serializer for callback result.
/// </summary>
public ICheckpointSerializer? Serializer { get; set; }
// Note: there is no Serializer property here. Callback result
// serialization flows through the ILambdaSerializer registered on
// ILambdaContext.Serializer, the same as StepAsync.
}

/// <summary>
Expand All @@ -1259,14 +1257,14 @@ public class InvokeConfig
public TimeSpan Timeout { get; set; } = TimeSpan.Zero;

/// <summary>
/// Custom serializer for the payload.
/// Optional tenant identifier propagated to the chained invocation.
/// Matches the tenantId field on Python/JS/Java InvokeConfig.
/// </summary>
public ICheckpointSerializer? PayloadSerializer { get; set; }
public string? TenantId { get; set; }

/// <summary>
/// Custom serializer for the result.
/// </summary>
public ICheckpointSerializer? ResultSerializer { get; set; }
// Note: there are no payload/result serializer properties here. Both
// flow through the ILambdaSerializer registered on
// ILambdaContext.Serializer, the same as StepAsync.
}

/// <summary>
Expand Down Expand Up @@ -1381,10 +1379,9 @@ public class CompletionConfig
/// </summary>
public class ChildContextConfig
{
/// <summary>
/// Custom serializer for the child context's return value.
/// </summary>
public ICheckpointSerializer? Serializer { get; set; }
// Note: there is no Serializer property here. The child context's
// return value is serialized via the ILambdaSerializer registered on
// ILambdaContext.Serializer, the same as StepAsync.

/// <summary>
/// Operation sub-type label for observability (e.g., in test runner output).
Expand Down Expand Up @@ -1425,34 +1422,54 @@ public class WaitForConditionConfig<TState>
public interface IBatchResult<T>
{
/// <summary>
/// All items (succeeded and failed).
/// All items, in original index order.
/// </summary>
IReadOnlyList<IBatchItem<T>> All { get; }

/// <summary>
/// Only successful items.
/// Items whose Status is Succeeded.
/// </summary>
IReadOnlyList<IBatchItem<T>> Succeeded { get; }

/// <summary>
/// Only failed items.
/// Items whose Status is Failed.
/// </summary>
IReadOnlyList<IBatchItem<T>> Failed { get; }

/// <summary>
/// Get all successful results. Throws if any failed.
/// Items still in flight when the batch resolved (CompletionConfig short-circuit).
/// </summary>
IReadOnlyList<IBatchItem<T>> Started { get; }

/// <summary>
/// Get all successful results in original index order. Throws if any failed.
/// </summary>
IReadOnlyList<T> GetResults();

/// <summary>
/// Throw an exception if any item failed.
/// Get all errors from failed items.
/// </summary>
IReadOnlyList<DurableExecutionException> GetErrors();

/// <summary>
/// Throw a single aggregated exception if any item failed.
/// </summary>
void ThrowIfError();

/// <summary>
/// Why the operation completed.
/// True if any item is in the Failed state.
/// </summary>
bool HasFailure { get; }

/// <summary>
/// Why the batch resolved.
/// </summary>
CompletionReason CompletionReason { get; }

int SuccessCount { get; }
int FailureCount { get; }
int StartedCount { get; }
int TotalCount { get; }
}

public interface IBatchItem<T>
Expand All @@ -1463,7 +1480,29 @@ public interface IBatchItem<T>
DurableExecutionException? Error { get; }
}

public enum BatchItemStatus { Succeeded, Failed, Cancelled }
/// <summary>
/// Status of an individual item in a batch result.
/// Mirrors the wire-state observed at the time the batch resolved — items still
/// running when a CompletionConfig short-circuits remain in <see cref="Started"/>.
/// </summary>
public enum BatchItemStatus
{
/// <summary>
/// The branch ran to completion and produced a result.
/// </summary>
Succeeded,

/// <summary>
/// The branch ran to completion and threw.
/// </summary>
Failed,

/// <summary>
/// The branch was still in flight when the batch's CompletionConfig
/// resolved (e.g., FirstSuccessful returned before this branch finished).
/// </summary>
Started
}
public enum CompletionReason { AllCompleted, MinSuccessfulReached, FailureToleranceExceeded }

/// <summary>
Expand Down Expand Up @@ -1616,32 +1655,17 @@ public class BadResult

### Custom Serialization

Implement `ICheckpointSerializer<T>` for custom serialization:
There is no per-call serializer override on any durable-execution API. Every checkpoint — step results, callback results, invoke payloads/results, child-context results — is serialized via the `ILambdaSerializer` registered on `ILambdaContext.Serializer`. To customize, register a different `ILambdaSerializer` for the function:

```csharp
public interface ICheckpointSerializer<T>
{
string Serialize(T value, SerializationContext context);
T Deserialize(string data, SerializationContext context);
}
// Class library mode — register via the assembly attribute.
[assembly: LambdaSerializer(typeof(MyCustomSerializer))]

public record SerializationContext(string OperationId, string DurableExecutionArn);
// Executable / custom runtime — pass to LambdaBootstrapBuilder.Create.
using var bootstrap = LambdaBootstrapBuilder.Create(handler, new MyCustomSerializer()).Build();
```

Usage — pass the serializer to the per-step `StepAsync` overload directly. This is
the only way to override the registered `ILambdaSerializer` for a single step's
checkpoint; it's intentional that there's no `StepConfig.Serializer` knob, so you
have one obvious place to opt in (and the type is `ICheckpointSerializer<T>`, not
a non-generic marker, so the compiler catches a mismatched `T`):

```csharp
var result = await context.StepAsync(
async () => await GetLargeData(),
new CompressedJsonSerializer<LargeData>(),
name: "get_data");
```

> **Status:** the `ICheckpointSerializer<T>` overload is a planned post-v1 addition. Today, all step checkpoints flow through the `ILambdaSerializer` registered on `ILambdaContext.Serializer` — see [NativeAOT compatibility](#nativeaot-compatibility) for how that's wired.
The customization applies uniformly to the whole function — there is no way today to swap the format for a single step or a single result type. See [NativeAOT compatibility](#nativeaot-compatibility) for how the registration flows in JIT vs. AOT.

### Class library vs. executable output

Expand Down Expand Up @@ -1722,27 +1746,7 @@ The SDK handles overflow transparently:

**Lambda response exceeding 6 MB:** If the final orchestration result exceeds the response payload limit, the SDK checkpoints the result before returning the `DurableExecutionInvocationOutput`. The service reads the result from the checkpoint rather than from the response body.

**Guidance for very large results:** For results that are inherently large (multi-MB payloads), use a custom `ICheckpointSerializer<T>` that offloads to external storage (S3, DynamoDB) and returns a reference. This keeps checkpoint sizes small and avoids pagination overhead:

```csharp
public class S3BackedSerializer<T> : ICheckpointSerializer<T>
{
public string Serialize(T value, SerializationContext context)
{
var key = $"results/{context.DurableExecutionArn}/{context.OperationId}";
// Upload to S3, return the key as the checkpoint value
_s3Client.PutObject(new PutObjectRequest { BucketName = _bucket, Key = key, ... });
return key;
}

public T Deserialize(string data, SerializationContext context)
{
// Download from S3 using the stored key
var response = _s3Client.GetObject(new GetObjectRequest { BucketName = _bucket, Key = data });
return JsonSerializer.Deserialize<T>(response.ResponseStream);
}
}
```
**Guidance for very large results:** For results that are inherently large (multi-MB payloads), do the offload yourself inside the step — write the payload to external storage (S3, DynamoDB) and return a reference (e.g. an S3 key) from the step. The reference is what the SDK serializes and checkpoints, so the checkpoint stays small and pagination is avoided. Subsequent steps fetch the payload from external storage on demand.

---

Expand Down Expand Up @@ -1946,7 +1950,7 @@ This is post-v1 work. For the initial release, developers test durable functions
- **Lambda runtime:** Requires the managed .NET 8 runtime or a custom runtime (`provided.al2023`) for NativeAOT deployments.
- **Durable execution service:** The function must be configured with `DurableConfig` (handled automatically by the `[DurableExecution]` source generator).
- **Qualified function identifiers:** `InvokeAsync` requires a version number, alias, or `$LATEST` — unqualified ARNs are not supported for durable invocations.
- **Serializable results:** All step return types must be JSON-serializable (or use a custom `ICheckpointSerializer<T>`).
- **Serializable results:** All step return types must be serializable by the `ILambdaSerializer` registered on `ILambdaContext.Serializer` (default: `System.Text.Json`).

---

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for a child context.
/// </summary>
/// <remarks>
/// A child context is a logical sub-workflow with its own deterministic
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx.StepAsync(..., "validate");                 // root  #1
ctx.RunInChildContextAsync(child => {           // root  #2  (the CONTEXT op)
    child.StepAsync(..., "charge");             // child #1
    child.StepAsync(..., "ship");               // child #2
}, name: "phase");
ctx.StepAsync(..., "notify");                   // root  #3

Resulting IDs:

Op Id ParentId
validate h("1") null
phase h("2") null
charge h("h("2")-1") h("2")
ship h("h("2")-2") h("2")
notify h("3") null

/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
/// (and overloads) to run code inside one.
/// </remarks>
public sealed class ChildContextConfig
{
/// <summary>
/// Operation sub-type label for observability (e.g. <c>"WaitForCallback"</c>).
/// Surfaces on the wire <c>OperationUpdate.SubType</c> field.
/// </summary>
public string? SubType { get; set; }

/// <summary>
/// Optional function to transform exceptions thrown by the child context's
/// user function before they surface to the caller. Useful for wrapping
/// low-level errors into domain-specific exceptions.
/// </summary>
/// <remarks>
/// Applied when the user function throws (the mapped exception propagates
/// to the caller of <c>RunInChildContextAsync</c>) and on replay of a
/// <c>FAILED</c> child context (the constructed
/// <see cref="ChildContextException"/> is mapped before being thrown).
/// </remarks>
public Func<Exception, Exception>? ErrorMapping { get; set; }
}
55 changes: 53 additions & 2 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private Task<T> RunStep<T>(

var operationId = _idGenerator.NextId();
var op = new StepOperation<T>(
operationId, name, func, config, serializer, Logger,
operationId, name, _idGenerator.ParentId, func, config, serializer, Logger,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
Expand All @@ -99,7 +99,58 @@ public Task WaitAsync(
var operationId = _idGenerator.NextId();
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
var op = new WaitOperation(
operationId, name, waitSeconds,
operationId, name, _idGenerator.ParentId, waitSeconds,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

public Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
=> RunChildContext(func, name, config, cancellationToken);

public async Task RunInChildContextAsync(
Func<IDurableContext, Task> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
{
// Void child contexts don't carry a meaningful payload; the wrapper
// returns null so the registered ILambdaSerializer is never asked to
// serialize a real value.
await RunChildContext<object?>(
async (ctx) => { await func(ctx); return null; },
name, config, cancellationToken);
}

private Task<T> RunChildContext<T>(
Func<IDurableContext, Task<T>> func,
string? name,
ChildContextConfig? config,
CancellationToken cancellationToken)
{
var serializer = LambdaContext.Serializer
?? throw new InvalidOperationException(
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
"In the class library programming model, register one with " +
"[assembly: LambdaSerializer(typeof(...))]. In an executable / custom " +
"runtime, pass it to LambdaBootstrapBuilder.Create(handler, serializer). " +
"In tests, set TestLambdaContext.Serializer.");

var operationId = _idGenerator.NextId();

// Capture this DurableContext's collaborators; the child shares state,
// termination, batcher, ARN, and Lambda context — but uses a child
// OperationIdGenerator so its operation IDs are deterministically
// namespaced under the parent op ID.
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);

var op = new ChildContextOperation<T>(
operationId, name, _idGenerator.ParentId, func, config, serializer, ChildFactory,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,31 @@ public StepInterruptedException(string message) : base(message) { }
/// <summary>Creates a <see cref="StepInterruptedException"/> wrapping an inner exception.</summary>
public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// Thrown when a child context's user function fails. Surfaces from
/// <c>RunInChildContextAsync</c>; the underlying error is preserved on the
/// <see cref="ErrorType"/>/<see cref="ErrorData"/>/<see cref="OriginalStackTrace"/>
/// fields. Use <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
/// domain-specific exception.
/// </summary>
public class ChildContextException : DurableExecutionException
{
/// <summary>
/// The child context's <see cref="ChildContextConfig.SubType"/>, if any.
/// </summary>
public string? SubType { get; init; }
/// <summary>The fully-qualified type name of the original exception.</summary>
public string? ErrorType { get; init; }
/// <summary>Optional structured error data attached by the user.</summary>
public string? ErrorData { get; init; }
/// <summary>Stack trace of the original exception, captured before serialization.</summary>
public IReadOnlyList<string>? OriginalStackTrace { get; init; }

/// <summary>Creates an empty <see cref="ChildContextException"/>.</summary>
public ChildContextException() { }
/// <summary>Creates a <see cref="ChildContextException"/> with the given message.</summary>
public ChildContextException(string message) : base(message) { }
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
}
Loading
Loading