diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs index e79cee30b..19251a60a 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs @@ -1,7 +1,6 @@ using Amazon.Lambda.Core; using Amazon.Lambda.DurableExecution.Internal; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; namespace Amazon.Lambda.DurableExecution; @@ -17,6 +16,7 @@ internal sealed class DurableContext : IDurableContext private readonly OperationIdGenerator _idGenerator; private readonly string _durableExecutionArn; private readonly CheckpointBatcher? _batcher; + private ILogger _logger; public DurableContext( ExecutionState state, @@ -32,13 +32,25 @@ public DurableContext( _durableExecutionArn = durableExecutionArn; _batcher = batcher; LambdaContext = lambdaContext; + _logger = new ReplayAwareLogger(new LambdaCoreLogger(), state, modeAware: true); } - // Replay-safe logger ships in a follow-up PR; see IDurableContext.Logger doc. - public ILogger Logger => NullLogger.Instance; + public ILogger Logger => _logger; public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn); public ILambdaContext LambdaContext { get; } + public void ConfigureLogger(LoggerConfig config) + { + if (config == null) throw new ArgumentNullException(nameof(config)); + + // If the user supplies a CustomLogger, wrap it. Otherwise re-wrap the + // existing inner logger (unwrapping if it was already a ReplayAwareLogger) + // so toggling ModeAware works without losing the previous custom logger. + var inner = config.CustomLogger + ?? (_logger is ReplayAwareLogger existing ? existing.Inner : _logger); + _logger = new ReplayAwareLogger(inner, _state, config.ModeAware); + } + public Task StepAsync( Func> func, string? name = null, diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs index e0d03747c..1cfe2c9ce 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs @@ -7,6 +7,7 @@ using Amazon.Lambda.DurableExecution.Services; using Amazon.Lambda.Model; using Amazon.Runtime; +using Microsoft.Extensions.Logging; namespace Amazon.Lambda.DurableExecution; @@ -111,9 +112,20 @@ private static async Task WrapAsyncCore result; try { - result = await DurableExecutionHandler.RunAsync( - state, terminationManager, - async () => await workflow(userPayload, context)); + // Push execution-level metadata into a logging scope so structured + // providers (the runtime's JSON formatter, Serilog, Powertools, + // etc.) tag every log line emitted by user code with the + // execution ARN and request id. + using (context.Logger.BeginScope(new Dictionary + { + ["durableExecutionArn"] = invocationInput.DurableExecutionArn, + ["awsRequestId"] = lambdaContext.AwsRequestId ?? string.Empty, + })) + { + result = await DurableExecutionHandler.RunAsync( + state, terminationManager, + async () => await workflow(userPayload, context)); + } await batcher.DrainAsync(); } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs index fb49d9e01..1ee5d2e2e 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs @@ -12,13 +12,21 @@ namespace Amazon.Lambda.DurableExecution; public interface IDurableContext { /// - /// A logger scoped to the durable execution. Currently returns - /// ; - /// the replay-safe DurableLogger (suppresses messages during replay) - /// ships in a follow-up PR. + /// Replay-safe logger. Messages emitted while the workflow is re-deriving + /// prior operations from checkpointed state are suppressed by default, so + /// a 30-step workflow re-invoked 30 times still emits each line once. + /// Use this instead of Console.WriteLine or other ambient loggers, + /// which will repeat on every replay. Replace the underlying logger or + /// disable replay-aware filtering via . /// ILogger Logger { get; } + /// + /// Swap the underlying logger or toggle replay-aware filtering. Idempotent — + /// later calls overwrite earlier configuration. + /// + void ConfigureLogger(LoggerConfig config); + /// /// Metadata about the current durable execution. /// @@ -68,7 +76,10 @@ Task WaitAsync( public interface IStepContext { /// - /// Logger scoped to this step. + /// Logger scoped to this step. Same instance as + /// ; emits within an + /// that carries the step's + /// operationId, operationName, and attempt. /// ILogger Logger { get; } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/LambdaCoreLogger.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/LambdaCoreLogger.cs new file mode 100644 index 000000000..1ffef5ec0 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/LambdaCoreLogger.cs @@ -0,0 +1,161 @@ +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.Logging; +using CoreLambdaLogger = Amazon.Lambda.Core.LambdaLogger; + +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// Default for . Routes log +/// records through so they flow into the same +/// pipeline used by the rest of the AWS Lambda for .NET runtime — the runtime +/// host installs a redirector that produces structured JSON when +/// AWS_LAMBDA_LOG_FORMAT=JSON and honors AWS_LAMBDA_LOG_LEVEL. +/// +/// +/// In-package adapter to avoid forcing a dependency on +/// Amazon.Lambda.Logging.AspNetCore; users who want a richer experience +/// (Serilog, Powertools, etc.) can swap their own logger via +/// . +/// +/// When state is the standard FormattedLogValues produced by +/// , the original template and named arguments +/// are forwarded so the runtime's JSON formatter surfaces named placeholders +/// ({OrderId}) as top-level structured attributes. Mirrors the pattern +/// in Amazon.Lambda.Logging.AspNetCore.LambdaILogger. +/// +/// maintains an chain of +/// scope state. Scopes whose state is a key/value collection have each entry +/// appended to the outgoing template/args, so structured scope metadata +/// (durableExecutionArn, operationId, etc.) shows up as +/// top-level JSON fields without callers having to swap in a third-party +/// logger. Inner scopes win on key collision; explicit message arguments +/// always win over scope keys. +/// +internal sealed class LambdaCoreLogger : ILogger +{ + private const string OriginalFormatKey = "{OriginalFormat}"; + + private static readonly AsyncLocal CurrentScope = new(); + + public IDisposable BeginScope(TState state) where TState : notnull + { + var scope = new Scope(state, CurrentScope.Value); + CurrentScope.Value = scope; + return scope; + } + + // Level filtering is performed by the runtime layer (AWS_LAMBDA_LOG_LEVEL). + public bool IsEnabled(LogLevel logLevel) => logLevel != LogLevel.None; + + public void Log( + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + { + if (!IsEnabled(logLevel)) return; + + string? messageTemplate = null; + var parameters = new List(); + HashSet? claimedKeys = null; + + if (state is IEnumerable> structure) + { + foreach (var property in structure) + { + if (property is { Key: OriginalFormatKey, Value: string value }) + { + messageTemplate = value; + } + else + { + parameters.Add(property.Value!); + claimedKeys ??= new HashSet(StringComparer.Ordinal); + claimedKeys.Add(property.Key); + } + } + + // No {OriginalFormat} → not a real FormattedLogValues; ignore the args + // we collected and fall back to the formatter below. + if (messageTemplate == null) + { + parameters.Clear(); + claimedKeys = null; + } + } + + messageTemplate ??= formatter(state, exception); + + AppendScopeAttributes(ref messageTemplate, parameters, ref claimedKeys); + + var levelName = logLevel.ToString(); + var args = parameters.Count == 0 ? Array.Empty() : parameters.ToArray(); + if (exception != null) + { + CoreLambdaLogger.Log(levelName, exception, messageTemplate, args); + } + else + { + CoreLambdaLogger.Log(levelName, messageTemplate, args); + } + } + + private static void AppendScopeAttributes( + ref string messageTemplate, + List parameters, + ref HashSet? claimedKeys) + { + var current = CurrentScope.Value; + if (current == null) return; + + StringBuilder? sb = null; + + // Walk innermost → outermost so the first key seen for a given name wins + // (mirrors how Microsoft.Extensions.Logging structured providers resolve + // overlapping scope keys: the closest scope dominates). + for (var s = current; s != null; s = s.Parent) + { + if (s.State is not IEnumerable> kvps) continue; + foreach (var kvp in kvps) + { + // Skip {OriginalFormat} (some scope-state factories emit one). + if (kvp.Key == OriginalFormatKey) continue; + + claimedKeys ??= new HashSet(StringComparer.Ordinal); + if (!claimedKeys.Add(kvp.Key)) continue; + + sb ??= new StringBuilder(messageTemplate); + sb.Append(' ').Append('{').Append(kvp.Key).Append('}'); + parameters.Add(kvp.Value!); + } + } + + if (sb != null) messageTemplate = sb.ToString(); + } + + private sealed class Scope : IDisposable + { + public object State { get; } + public Scope? Parent { get; } + private bool _disposed; + + public Scope(object state, Scope? parent) + { + State = state; + Parent = parent; + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + // Restore the parent. Out-of-order disposal would desync the chain, + // but that violates the using-statement contract that callers rely + // on; we don't try to defend against it. + CurrentScope.Value = Parent; + } + } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ReplayAwareLogger.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ReplayAwareLogger.cs new file mode 100644 index 000000000..5e24f53fe --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ReplayAwareLogger.cs @@ -0,0 +1,57 @@ +using Microsoft.Extensions.Logging; + +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// decorator that suppresses messages while the workflow +/// is replaying prior operations. Reads +/// on every call so it correctly transitions to passthrough the moment the +/// state's per-operation tracker decides we've caught up to fresh execution. +/// +/// +/// Mirrors the suppression behavior of the Python and Java durable execution +/// SDKs: replay calls return without invoking the +/// inner logger. always delegates so scopes +/// stay balanced — suppression only applies at log emission. +/// +internal sealed class ReplayAwareLogger : ILogger +{ + private readonly ILogger _inner; + private readonly ExecutionState _state; + private readonly bool _modeAware; + + public ReplayAwareLogger(ILogger inner, ExecutionState state, bool modeAware) + { + _inner = inner; + _state = state; + _modeAware = modeAware; + } + + /// The wrapped logger; exposed so ConfigureLogger can rewrap without losing it. + public ILogger Inner => _inner; + + /// Whether replay suppression is active. + public bool ModeAware => _modeAware; + + public IDisposable? BeginScope(TState state) where TState : notnull + => _inner.BeginScope(state); + + public bool IsEnabled(LogLevel logLevel) + { + if (ShouldSuppress()) return false; + return _inner.IsEnabled(logLevel); + } + + public void Log( + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + { + if (ShouldSuppress()) return; + _inner.Log(logLevel, eventId, state, exception, formatter); + } + + private bool ShouldSuppress() => _modeAware && _state.IsReplaying; +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs index ffdaaffd7..e37924634 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs @@ -198,7 +198,21 @@ private async Task ExecuteFunc(int attemptNumber, CancellationToken cancellat try { var stepContext = new StepContext(OperationId, attemptNumber, _logger); - var result = await _func(stepContext); + + // Step-scoped metadata so structured log providers tag user code + // lines with the operation id, name, and current attempt. Wrap + // only the user-func call — checkpoint emission shouldn't carry + // step metadata into any side-channel logging. + T result; + using (_logger.BeginScope(new Dictionary + { + ["operationId"] = OperationId, + ["operationName"] = Name ?? string.Empty, + ["attempt"] = attemptNumber, + })) + { + result = await _func(stepContext); + } await EnqueueAsync(new SdkOperationUpdate { diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/LoggerConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/LoggerConfig.cs new file mode 100644 index 000000000..801d0a7c9 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/LoggerConfig.cs @@ -0,0 +1,24 @@ +using Microsoft.Extensions.Logging; + +namespace Amazon.Lambda.DurableExecution; + +/// +/// Configuration for . Lets users +/// swap the underlying (e.g. Serilog, AWS Lambda Powertools) +/// or disable replay-aware filtering for debugging. +/// +public sealed class LoggerConfig +{ + /// + /// Optional to use instead of the SDK default. When + /// null, the durable context keeps its existing inner logger. + /// + public ILogger? CustomLogger { get; init; } + + /// + /// When true (default), messages are suppressed while the workflow is + /// re-deriving prior operations from checkpointed state. Set to false to + /// see every log line on every replay (useful for local debugging). + /// + public bool ModeAware { get; init; } = true; +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/Amazon.Lambda.DurableExecution.IntegrationTests.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/Amazon.Lambda.DurableExecution.IntegrationTests.csproj index 0ef2e561d..8dda2b047 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/Amazon.Lambda.DurableExecution.IntegrationTests.csproj +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/Amazon.Lambda.DurableExecution.IntegrationTests.csproj @@ -31,6 +31,7 @@ + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayAwareLoggerTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayAwareLoggerTest.cs new file mode 100644 index 000000000..8280ed6a3 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayAwareLoggerTest.cs @@ -0,0 +1,197 @@ +using System.Linq; +using System.Text; +using System.Text.Json; +using Amazon.CloudWatchLogs; +using Amazon.CloudWatchLogs.Model; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +/// +/// End-to-end proof of the replay-aware logger: a workflow with a Wait between +/// two steps re-invokes Lambda once. Lines emitted via +/// context.Logger.LogInformation in the workflow body and after step 1 +/// must appear ONCE in CloudWatch (suppressed on the replay invocation), +/// while parallel Console.WriteLine control lines must appear TWICE +/// (proving the function genuinely replayed). +/// +public class ReplayAwareLoggerTest +{ + private readonly ITestOutputHelper _output; + public ReplayAwareLoggerTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task ReplayAwareLogger_SuppressesDuplicateLogsOnReplay() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("ReplayAwareLoggerFunction"), + "logreplay", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "log-replay"}"""); + Assert.Equal(200, invokeResponse.StatusCode); + + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(60)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + // Sanity check the durable history: two step events, one wait, one + // re-invocation. Confirms the workflow really did replay. + await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2 + && (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false), + TimeSpan.FromSeconds(60)); + + // CloudWatch is eventually consistent — wait until ALL log lines we + // expect have been ingested. The stop condition demands the full + // expected count of every marker so the test never short-circuits with + // a still-arriving "after_step1" record (which is emitted at a + // different timestamp than workflow_start and indexes independently). + using var logs = new AmazonCloudWatchLogsClient(RegionEndpoint.USEast1); + var logGroup = $"/aws/lambda/{deployment.FunctionName}"; + + var allEvents = await PollForLogEvents( + logs, logGroup, + stopWhen: events => + // Replay-aware: 1 each (suppressed on the second invocation). + CountMatching(events, "LOG_REPLAY_TEST workflow_start") >= 1 && + CountMatching(events, "LOG_REPLAY_TEST inside_step1") >= 1 && + CountMatching(events, "LOG_REPLAY_TEST after_step1") >= 1 && + CountMatching(events, "LOG_REPLAY_TEST workflow_end") >= 1 && + // Control: workflow_start and after_step1 emit on both + // invocations (2 each); workflow_end only on the second (1). + CountMatching(events, "LOG_REPLAY_CONTROL workflow_start") >= 2 && + CountMatching(events, "LOG_REPLAY_CONTROL after_step1") >= 2 && + CountMatching(events, "LOG_REPLAY_CONTROL workflow_end") >= 1, + timeout: TimeSpan.FromMinutes(2)); + + var messages = allEvents.Select(e => e.Message ?? string.Empty).ToList(); + _output.WriteLine($"Collected {messages.Count} log events from {logGroup}"); + + // Replay-aware lines: each must appear exactly once across both invocations. + Assert.Equal(1, CountMatching(messages, "LOG_REPLAY_TEST workflow_start")); + Assert.Equal(1, CountMatching(messages, "LOG_REPLAY_TEST inside_step1")); + Assert.Equal(1, CountMatching(messages, "LOG_REPLAY_TEST after_step1")); + Assert.Equal(1, CountMatching(messages, "LOG_REPLAY_TEST workflow_end")); + + // Control lines (Console.WriteLine, not replay-aware): the + // workflow-start and after_step1 markers run on both invocations and + // must appear twice; workflow_end runs only on the second invocation + // (after the Wait completes) so it appears once. + Assert.Equal(2, CountMatching(messages, "LOG_REPLAY_CONTROL workflow_start")); + Assert.Equal(2, CountMatching(messages, "LOG_REPLAY_CONTROL after_step1")); + Assert.Equal(1, CountMatching(messages, "LOG_REPLAY_CONTROL workflow_end")); + + // The function runs with AWS_LAMBDA_LOG_FORMAT=JSON, so the runtime + // emits one JSON object per log record. The replay-aware lines were + // emitted under DurableFunction's execution-level BeginScope; the + // inside_step1 line was additionally inside StepOperation's per-step + // BeginScope. LambdaCoreLogger appends the scope KVPs as named + // placeholders, which the runtime's JSON formatter promotes to + // top-level fields. Verify that. + AssertScopeFieldsOnRecord(messages, "LOG_REPLAY_TEST workflow_start", + requireExecutionScope: true, requireStepScope: false); + AssertScopeFieldsOnRecord(messages, "LOG_REPLAY_TEST inside_step1", + requireExecutionScope: true, requireStepScope: true); + } + + private void AssertScopeFieldsOnRecord( + List messages, string substring, + bool requireExecutionScope, bool requireStepScope) + { + var record = messages.FirstOrDefault(m => m.Contains(substring, StringComparison.Ordinal)); + Assert.NotNull(record); + + // CloudWatch occasionally prefixes the JSON line with text (e.g., when + // the runtime falls back to plain stdout); slice from the first '{'. + var braceIdx = record!.IndexOf('{'); + Assert.True(braceIdx >= 0, $"No JSON object in record: {record}"); + + using var doc = JsonDocument.Parse(record[braceIdx..]); + var root = doc.RootElement; + _output.WriteLine($"[scope-check] {substring} → {record[braceIdx..]}"); + + if (requireExecutionScope) + { + Assert.True(root.TryGetProperty("durableExecutionArn", out _), + $"durableExecutionArn missing on record: {record}"); + Assert.True(root.TryGetProperty("awsRequestId", out _), + $"awsRequestId missing on record: {record}"); + } + if (requireStepScope) + { + Assert.True(root.TryGetProperty("operationId", out _), + $"operationId missing on record: {record}"); + Assert.True(root.TryGetProperty("operationName", out _), + $"operationName missing on record: {record}"); + Assert.True(root.TryGetProperty("attempt", out _), + $"attempt missing on record: {record}"); + } + } + + private static int CountMatching(IEnumerable events, string substring) + => events.Count(e => e.Message != null && e.Message.Contains(substring, StringComparison.Ordinal)); + + private static int CountMatching(IEnumerable messages, string substring) + => messages.Count(m => m.Contains(substring, StringComparison.Ordinal)); + + private async Task> PollForLogEvents( + IAmazonCloudWatchLogs logs, + string logGroupName, + Func, bool> stopWhen, + TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + var attempt = 0; + var lastSeen = new List(); + + // Filter only on our marker prefix to keep payload size small. + const string filterPattern = "\"LOG_REPLAY_\""; + + while (DateTime.UtcNow < deadline) + { + attempt++; + try + { + var events = new List(); + string? nextToken = null; + do + { + var resp = await logs.FilterLogEventsAsync(new FilterLogEventsRequest + { + LogGroupName = logGroupName, + FilterPattern = filterPattern, + NextToken = nextToken, + }); + if (resp.Events != null) events.AddRange(resp.Events); + nextToken = resp.NextToken; + } while (!string.IsNullOrEmpty(nextToken)); + + _output.WriteLine($"[CW poll {attempt}] events={events.Count}"); + lastSeen = events; + if (stopWhen(events)) return events; + } + catch (Amazon.CloudWatchLogs.Model.ResourceNotFoundException) + { + // Log group not yet provisioned — Lambda creates it on first + // invocation, but it can lag behind the function being Active. + _output.WriteLine($"[CW poll {attempt}] log group not yet present: {logGroupName}"); + } + catch (Exception ex) + { + _output.WriteLine($"[CW poll {attempt}] error (will retry): {ex.Message}"); + } + await Task.Delay(TimeSpan.FromSeconds(3)); + } + + _output.WriteLine($"[CW poll] gave up after {attempt} attempts; returning last-seen ({lastSeen.Count} events)"); + return lastSeen; + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Dockerfile new file mode 100644 index 000000000..92f5263e9 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Dockerfile @@ -0,0 +1,12 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +# Emit structured JSON logs so the integration test can parse log records and +# assert the durable-execution scope keys (durableExecutionArn, operationId, +# etc.) appear as top-level fields. +ENV AWS_LAMBDA_LOG_FORMAT=JSON + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Function.cs new file mode 100644 index 000000000..dbbcc24a9 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Function.cs @@ -0,0 +1,75 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; +using Microsoft.Extensions.Logging; + +namespace DurableExecutionTestFunction; + +/// +/// Workflow used by ReplayAwareLoggerTest. Pairs each replay-aware +/// context.Logger.LogInformation line with a control +/// Console.WriteLine so the test can prove the SDK suppresses replay +/// duplicates: the LogInformation lines should appear exactly once across the +/// two invocations a Wait-driven workflow produces, while the Console.WriteLine +/// control lines should appear once per invocation. +/// +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Workflow-level: emitted on invocation 1, suppressed on invocation 2 (replay). + context.Logger.LogInformation("LOG_REPLAY_TEST workflow_start order={OrderId}", input.OrderId); + Console.WriteLine($"LOG_REPLAY_CONTROL workflow_start order={input.OrderId}"); + + var step1 = await context.StepAsync( + async (_) => + { + // Emitted inside the step's BeginScope, so the line carries + // both execution-level scope (durableExecutionArn, awsRequestId) + // AND step-level scope (operationId, operationName, attempt). + context.Logger.LogInformation("LOG_REPLAY_TEST inside_step1 order={OrderId}", input.OrderId); + await Task.CompletedTask; + return $"validated-{input.OrderId}"; + }, + name: "validate"); + + // Between-step log: invocation 1 emits, invocation 2 is still in Replay + // (Wait-on-SUCCEEDED replay does not flip the mode), so it must be suppressed. + context.Logger.LogInformation("LOG_REPLAY_TEST after_step1 result={Result}", step1); + Console.WriteLine($"LOG_REPLAY_CONTROL after_step1 result={step1}"); + + await context.WaitAsync(TimeSpan.FromSeconds(3), name: "short_wait"); + + // Step 2 runs fresh on invocation 2 — its EnterExecutionMode flips the + // logger from suppress to passthrough. The next LogInformation lands. + var step2 = await context.StepAsync( + async (_) => + { + await Task.CompletedTask; + return $"processed-{step1}"; + }, + name: "process"); + + context.Logger.LogInformation("LOG_REPLAY_TEST workflow_end final={Final}", step2); + Console.WriteLine($"LOG_REPLAY_CONTROL workflow_end final={step2}"); + + return new TestResult { Status = "completed", Data = step2 }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/ReplayAwareLoggerFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/ReplayAwareLoggerFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/ReplayAwareLoggerFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs index 50e511cb1..14ea81b23 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs @@ -2,8 +2,11 @@ using Amazon.Lambda.DurableExecution; using Amazon.Lambda.DurableExecution.Internal; using Amazon.Lambda.Serialization.SystemTextJson; +using Amazon.Lambda.DurableExecution.Tests.Internal; using Amazon.Lambda.TestUtilities; +using Microsoft.Extensions.Logging; using Xunit; +using LogLevel = Microsoft.Extensions.Logging.LogLevel; namespace Amazon.Lambda.DurableExecution.Tests; @@ -222,10 +225,61 @@ public async Task StepAsync_NoSerializerOnContext_ThrowsInvalidOperation() } [Fact] - public void Logger_Defaults_ToNullLogger() + public void Logger_Default_IsReplayAwareLogger() { var context = CreateContext(); Assert.NotNull(context.Logger); + Assert.IsType(context.Logger); + } + + [Fact] + public void ConfigureLogger_NullArg_Throws() + { + var context = CreateContext(); + Assert.Throws(() => context.ConfigureLogger(null!)); + } + + [Fact] + public void ConfigureLogger_WithCustomLogger_ReachesUserLogger() + { + var context = CreateContext(); + var custom = new RecordingLogger(); + context.ConfigureLogger(new LoggerConfig { CustomLogger = custom }); + + // Default state has no checkpoint → starts in Execution mode, so + // logs flow through immediately. + context.Logger.LogInformation("hi"); + + Assert.Single(custom.Records); + Assert.Equal(LogLevel.Information, custom.Records[0].Level); + } + + [Fact] + public void ConfigureLogger_ModeAwareFalse_LogsDuringReplay() + { + // Seed a checkpoint so the context starts in Replay mode. + var custom = new RecordingLogger(); + var context = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(99), + Type = OperationTypes.Step, + Status = OperationStatuses.Succeeded + } + } + }); + + context.ConfigureLogger(new LoggerConfig { CustomLogger = custom, ModeAware = true }); + context.Logger.LogInformation("replay-default"); + Assert.Empty(custom.Records); + + context.ConfigureLogger(new LoggerConfig { ModeAware = false }); + context.Logger.LogInformation("replay-disabled"); + Assert.Single(custom.Records); + Assert.Contains("replay-disabled", custom.Records[0].Message); } [Fact] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/Internal/LambdaCoreLoggerTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/Internal/LambdaCoreLoggerTests.cs new file mode 100644 index 000000000..0bd7b50d6 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/Internal/LambdaCoreLoggerTests.cs @@ -0,0 +1,268 @@ +using System.Reflection; +using Amazon.Lambda.DurableExecution.Internal; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests.Internal; + +/// +/// Asserts that LambdaCoreLogger preserves the original message template and +/// named placeholder arguments when forwarding to Amazon.Lambda.Core.LambdaLogger. +/// This is the contract that lets the Lambda runtime's JSON formatter emit +/// {OrderId}-style fields as top-level structured attributes. +/// +public class LambdaCoreLoggerTests : IDisposable +{ + private readonly Action? _originalLevelAction; + private readonly Action? _originalLevelAndExAction; + // The capturing delegates are invoked from concurrent tasks in the AsyncLocal + // test — guard list mutation with a lock. + private readonly object _captureLock = new(); + private readonly List<(string Level, string Template, object[] Args, Exception? Exception)> _captured = new(); + + public LambdaCoreLoggerTests() + { + _originalLevelAction = SwapLevelAction((level, template, args) => + { + lock (_captureLock) _captured.Add((level, template, args, null)); + }); + + _originalLevelAndExAction = SwapLevelAndExceptionAction((level, ex, template, args) => + { + lock (_captureLock) _captured.Add((level, template, args, ex)); + }); + } + + public void Dispose() + { + if (_originalLevelAction != null) SwapLevelAction(_originalLevelAction); + if (_originalLevelAndExAction != null) SwapLevelAndExceptionAction(_originalLevelAndExAction); + } + + [Fact] + public void Log_NamedPlaceholders_ForwardsTemplateAndArgs() + { + var logger = new LambdaCoreLogger(); + + logger.LogInformation("User {OrderId} bought {Count}", "abc-123", 7); + + var entry = Assert.Single(_captured); + Assert.Equal("Information", entry.Level); + Assert.Equal("User {OrderId} bought {Count}", entry.Template); + Assert.Equal(new object[] { "abc-123", 7 }, entry.Args); + Assert.Null(entry.Exception); + } + + [Fact] + public void Log_NamedPlaceholdersWithException_ForwardsTemplateAndArgs() + { + var logger = new LambdaCoreLogger(); + var ex = new InvalidOperationException("boom"); + + logger.LogError(ex, "Failed for {OrderId}", "abc-123"); + + var entry = Assert.Single(_captured); + Assert.Equal("Error", entry.Level); + Assert.Equal("Failed for {OrderId}", entry.Template); + Assert.Equal(new object[] { "abc-123" }, entry.Args); + Assert.Same(ex, entry.Exception); + } + + [Fact] + public void Log_PlainMessage_ForwardsAsLiteralWithEmptyArgs() + { + var logger = new LambdaCoreLogger(); + + logger.LogWarning("nothing structured here"); + + var entry = Assert.Single(_captured); + Assert.Equal("Warning", entry.Level); + Assert.Equal("nothing structured here", entry.Template); + Assert.Empty(entry.Args); + } + + [Fact] + public void Log_NonKvpState_FallsBackToFormatter() + { + var logger = new LambdaCoreLogger(); + + // Direct ILogger.Log call with a custom TState that is NOT + // FormattedLogValues. The formatter must be used to render the message. + ((ILogger)logger).Log( + LogLevel.Information, + new EventId(0), + state: 42, + exception: null, + formatter: (s, _) => $"value={s}"); + + var entry = Assert.Single(_captured); + Assert.Equal("Information", entry.Level); + Assert.Equal("value=42", entry.Template); + Assert.Empty(entry.Args); + } + + [Fact] + public void IsEnabled_None_ReturnsFalse() + { + var logger = new LambdaCoreLogger(); + Assert.False(logger.IsEnabled(LogLevel.None)); + Assert.True(logger.IsEnabled(LogLevel.Trace)); + } + + [Fact] + public void Log_WithKvpScope_AppendsScopeKeysToTemplateAndArgs() + { + var logger = new LambdaCoreLogger(); + + using (logger.BeginScope(new Dictionary + { + ["operationId"] = "op-1", + ["attempt"] = 2, + })) + { + logger.LogInformation("step done {Result}", "ok"); + } + + var entry = Assert.Single(_captured); + // The template's own placeholders come first; scope keys are appended. + Assert.Equal("step done {Result} {operationId} {attempt}", entry.Template); + Assert.Equal(new object[] { "ok", "op-1", 2 }, entry.Args); + } + + [Fact] + public void Log_WithNestedKvpScopes_InnerWinsAndOrderInnerToOuter() + { + var logger = new LambdaCoreLogger(); + + using (logger.BeginScope(new Dictionary + { + ["durableExecutionArn"] = "arn-outer", + ["awsRequestId"] = "req-1", + })) + using (logger.BeginScope(new Dictionary + { + ["operationId"] = "op-1", + ["awsRequestId"] = "req-INNER-WINS", // overrides outer + })) + { + logger.LogInformation("hello {Name}", "world"); + } + + var entry = Assert.Single(_captured); + // Inner scope keys appear before outer; the inner awsRequestId wins. + Assert.Equal( + "hello {Name} {operationId} {awsRequestId} {durableExecutionArn}", + entry.Template); + Assert.Equal( + new object[] { "world", "op-1", "req-INNER-WINS", "arn-outer" }, + entry.Args); + } + + [Fact] + public void Log_MessageArgWinsOverScopeKeyWithSameName() + { + var logger = new LambdaCoreLogger(); + + using (logger.BeginScope(new Dictionary + { + ["OrderId"] = "from-scope", + })) + { + logger.LogInformation("processing {OrderId}", "from-message"); + } + + var entry = Assert.Single(_captured); + // Scope key OrderId is dropped because the explicit message arg already + // claimed it; the runtime formatter sees only the explicit value. + Assert.Equal("processing {OrderId}", entry.Template); + Assert.Equal(new object[] { "from-message" }, entry.Args); + } + + [Fact] + public void BeginScope_PopsOnDispose_NoLeakAcrossLogCalls() + { + var logger = new LambdaCoreLogger(); + + using (logger.BeginScope(new Dictionary { ["scoped"] = "yes" })) + { + logger.LogInformation("inside"); + } + logger.LogInformation("outside"); + + Assert.Equal(2, _captured.Count); + Assert.Equal("inside {scoped}", _captured[0].Template); + Assert.Equal(new object[] { "yes" }, _captured[0].Args); + // After the using-block, the scope is popped; the second log carries no + // appended scope keys. + Assert.Equal("outside", _captured[1].Template); + Assert.Empty(_captured[1].Args); + } + + [Fact] + public async Task BeginScope_IsAsyncLocal_DoesNotLeakAcrossTasks() + { + var logger = new LambdaCoreLogger(); + var sibling1Captured = new List<(string Template, object[] Args)>(); + var sibling2Captured = new List<(string Template, object[] Args)>(); + + var inflight = new TaskCompletionSource(); + + async Task Sibling(string id, List<(string, object[])> sink) + { + using (logger.BeginScope(new Dictionary { ["taskId"] = id })) + { + // Yield to give the other task a chance to run with its own scope. + await Task.Yield(); + logger.LogInformation("emit"); + inflight.TrySetResult(); + } + } + + // Replace the capture sink temporarily so we can route per-task. + // Easiest: just inspect _captured, since the AsyncLocal scope chain is + // what we care about — order doesn't matter. + await Task.WhenAll(Sibling("A", sibling1Captured), Sibling("B", sibling2Captured)); + + Assert.Equal(2, _captured.Count); + var taskIds = _captured.Select(c => c.Args.Single()).OrderBy(v => v).ToArray(); + Assert.Equal(new object[] { "A", "B" }, taskIds); + Assert.All(_captured, c => Assert.Equal("emit {taskId}", c.Template)); + } + + [Fact] + public void BeginScope_NonKvpScope_Ignored() + { + var logger = new LambdaCoreLogger(); + + using (logger.BeginScope("just-a-string")) + { + logger.LogInformation("hello"); + } + + var entry = Assert.Single(_captured); + // String scopes don't carry keys; nothing to append. + Assert.Equal("hello", entry.Template); + Assert.Empty(entry.Args); + } + + private static Action? SwapLevelAction(Action replacement) + { + var field = typeof(Amazon.Lambda.Core.LambdaLogger).GetField( + "_loggingWithLevelAction", + BindingFlags.NonPublic | BindingFlags.Static)!; + var original = (Action?)field.GetValue(null); + field.SetValue(null, replacement); + return original; + } + + private static Action? SwapLevelAndExceptionAction( + Action replacement) + { + var field = typeof(Amazon.Lambda.Core.LambdaLogger).GetField( + "_loggingWithLevelAndExceptionAction", + BindingFlags.NonPublic | BindingFlags.Static)!; + var original = (Action?)field.GetValue(null); + field.SetValue(null, replacement); + return original; + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/Internal/ReplayAwareLoggerTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/Internal/ReplayAwareLoggerTests.cs new file mode 100644 index 000000000..dc92ccf79 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/Internal/ReplayAwareLoggerTests.cs @@ -0,0 +1,153 @@ +using Amazon.Lambda.DurableExecution.Internal; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests.Internal; + +public class ReplayAwareLoggerTests +{ + private const string SeedOpId = "seed"; + + private static ExecutionState ReplayState() + { + // Seed one completed user-replayable op so IsReplaying starts true. + // The op is NOT yet visited via TrackReplay, so we stay in replay. + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() { Id = SeedOpId, Type = OperationTypes.Step, Status = OperationStatuses.Succeeded } + } + }); + Assert.True(state.IsReplaying); + return state; + } + + [Fact] + public void Log_DuringReplay_Suppressed() + { + var inner = new RecordingLogger(); + var logger = new ReplayAwareLogger(inner, ReplayState(), modeAware: true); + + logger.LogInformation("hello"); + + Assert.Empty(inner.Records); + } + + [Fact] + public void Log_DuringExecution_Passthrough() + { + var state = ReplayState(); + state.TrackReplay(SeedOpId); + var inner = new RecordingLogger(); + var logger = new ReplayAwareLogger(inner, state, modeAware: true); + + logger.LogInformation("hello"); + + Assert.Single(inner.Records); + Assert.Equal(LogLevel.Information, inner.Records[0].Level); + } + + [Fact] + public void Log_ModeAwareFalse_AlwaysLogs() + { + var inner = new RecordingLogger(); + var logger = new ReplayAwareLogger(inner, ReplayState(), modeAware: false); + + logger.LogWarning("still here"); + + Assert.Single(inner.Records); + } + + [Fact] + public void IsEnabled_DuringReplay_ReturnsFalse() + { + var inner = new RecordingLogger { ForcedEnabled = true }; + var logger = new ReplayAwareLogger(inner, ReplayState(), modeAware: true); + + Assert.False(logger.IsEnabled(LogLevel.Information)); + } + + [Fact] + public void IsEnabled_DuringExecution_DelegatesToInner() + { + var state = ReplayState(); + state.TrackReplay(SeedOpId); + var inner = new RecordingLogger { ForcedEnabled = false }; + var logger = new ReplayAwareLogger(inner, state, modeAware: true); + + Assert.False(logger.IsEnabled(LogLevel.Information)); + + inner.ForcedEnabled = true; + Assert.True(logger.IsEnabled(LogLevel.Information)); + } + + [Fact] + public void BeginScope_AlwaysDelegates() + { + var inner = new RecordingLogger(); + var logger = new ReplayAwareLogger(inner, ReplayState(), modeAware: true); + + // Even during replay, scopes must pass through to keep the scope stack + // balanced. + using (logger.BeginScope("scope-during-replay")) + { + Assert.Equal(1, inner.OpenScopes); + } + Assert.Equal(0, inner.OpenScopes); + } + + [Fact] + public void Log_TransitionsFromReplayToExecution() + { + // Mirror Python's test_logger_replay_then_new_logging: while the state + // is replaying the logger drops messages, but the moment TrackReplay + // visits the last checkpointed op IsReplaying flips and the next log + // line lands. + var state = ReplayState(); + var inner = new RecordingLogger(); + var logger = new ReplayAwareLogger(inner, state, modeAware: true); + + logger.LogInformation("during replay"); + Assert.Empty(inner.Records); + + state.TrackReplay(SeedOpId); + logger.LogInformation("after transition"); + + Assert.Single(inner.Records); + Assert.Contains("after transition", inner.Records[0].Message); + } +} + +internal sealed class RecordingLogger : ILogger +{ + public List<(LogLevel Level, string Message)> Records { get; } = new(); + public int OpenScopes { get; private set; } + public bool ForcedEnabled { get; set; } = true; + + public IDisposable BeginScope(TState state) where TState : notnull + { + OpenScopes++; + return new ScopeToken(this); + } + + public bool IsEnabled(LogLevel logLevel) => ForcedEnabled; + + public void Log( + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + { + Records.Add((logLevel, formatter(state, exception))); + } + + private sealed class ScopeToken : IDisposable + { + private readonly RecordingLogger _owner; + public ScopeToken(RecordingLogger owner) => _owner = owner; + public void Dispose() => _owner.OpenScopes--; + } +}