diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/general/PluginExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/general/PluginExample.java new file mode 100644 index 000000000..478294d9d --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/general/PluginExample.java @@ -0,0 +1,100 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.general; + +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.plugin.*; + +/** + * Example demonstrating plugin instrumentation with the Durable Execution SDK. + * + *

This handler registers a simple logging plugin that prints lifecycle events to stdout (which appears in CloudWatch + * Logs when deployed). Deploy this and check CloudWatch to verify all hooks fire at the right times. + * + *

Expected output for a successful run: + * + *

+ * [PLUGIN] onInvocationStart: requestId=..., executionArn=..., firstInvocation=true
+ * [PLUGIN] onOperationStart: name=create-greeting, type=STEP
+ * [PLUGIN] onUserFunctionStart: name=create-greeting, attempt=1
+ * [PLUGIN] onUserFunctionEnd: name=create-greeting, succeeded=true
+ * [PLUGIN] onOperationEnd: name=create-greeting
+ * [PLUGIN] onOperationStart: name=transform, type=STEP
+ * [PLUGIN] onUserFunctionStart: name=transform, attempt=1
+ * [PLUGIN] onUserFunctionEnd: name=transform, succeeded=true
+ * [PLUGIN] onOperationEnd: name=transform
+ * [PLUGIN] onInvocationEnd: status=SUCCEEDED
+ * 
+ */ +public class PluginExample extends DurableHandler { + + @Override + protected DurableConfig createConfiguration() { + return DurableConfig.builder().withPlugins(new LoggingPlugin()).build(); + } + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + context.getLogger().info("Starting plugin example for {}", input.getName()); + + var greeting = context.step("create-greeting", String.class, stepCtx -> "Hello, " + input.getName()); + + var result = context.step("transform", String.class, stepCtx -> greeting.toUpperCase() + "!"); + + context.getLogger().info("Plugin example complete: {}", result); + return result; + } + + /** A simple plugin that logs all lifecycle events to stdout. In Lambda, stdout goes to CloudWatch Logs. */ + static class LoggingPlugin implements DurableExecutionPlugin { + + @Override + public void onInvocationStart(InvocationInfo info) { + System.out.printf( + "[PLUGIN] onInvocationStart: requestId=%s, executionArn=%s, firstInvocation=%s%n", + info.requestId(), info.executionArn(), info.isFirstInvocation()); + } + + @Override + public void onInvocationEnd(InvocationEndInfo info) { + System.out.printf( + "[PLUGIN] onInvocationEnd: status=%s, error=%s%n", + info.invocationStatus(), + info.executionError() != null ? info.executionError().getMessage() : null); + } + + @Override + public void onOperationStart(OperationInfo info) { + System.out.printf( + "[PLUGIN] onOperationStart: name=%s, type=%s, id=%s%n", info.name(), info.type(), info.id()); + } + + @Override + public void onOperationEnd(OperationEndInfo info) { + System.out.printf( + "[PLUGIN] onOperationEnd: name=%s, type=%s, error=%s%n", + info.name(), + info.type(), + info.error() != null ? info.error().getMessage() : null); + } + + @Override + public void onUserFunctionStart(UserFunctionStartInfo info) { + System.out.printf( + "[PLUGIN] onUserFunctionStart: name=%s, type=%s, attempt=%s, isReplayingChildren=%s%n", + info.name(), info.type(), info.attempt(), info.isReplayingChildren()); + } + + @Override + public void onUserFunctionEnd(UserFunctionEndInfo info) { + System.out.printf( + "[PLUGIN] onUserFunctionEnd: name=%s, succeeded=%s, error=%s%n", + info.name(), + info.succeeded(), + info.error() != null ? info.error().getMessage() : null); + } + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java index d9df8bc67..a32295b40 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -833,4 +833,18 @@ void testConcurrentWaitForConditionExample() { + waitForConditionResult.getDuration().toSeconds() + "s, expected < 30s"); } } + + @Test + void testPluginExample() { + var runner = + CloudDurableTestRunner.create(arn("plugin-example"), GreetingRequest.class, String.class, lambdaClient); + var result = runner.run(new GreetingRequest("World")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("HELLO, WORLD!", result.getResult()); + + // Verify operations were tracked + assertNotNull(runner.getOperation("create-greeting")); + assertNotNull(runner.getOperation("transform")); + } } diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/general/PluginExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/general/PluginExampleTest.java new file mode 100644 index 000000000..53d4e2b21 --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/general/PluginExampleTest.java @@ -0,0 +1,42 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.general; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class PluginExampleTest { + + @Test + void testPluginExample_executesSuccessfully() { + var handler = new PluginExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var result = runner.run(new GreetingRequest("World")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("HELLO, WORLD!", result.getResult(String.class)); + + // Verify operations were tracked + assertNotNull(result.getOperation("create-greeting")); + assertNotNull(result.getOperation("transform")); + } + + @Test + void testPluginExample_pluginHooksFire() { + // This test verifies that the plugin hooks fire without error. + // Check stdout/CloudWatch for [PLUGIN] log lines when deployed. + var handler = new PluginExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var result = runner.runUntilComplete(new GreetingRequest("Test")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("HELLO, TEST!", result.getResult(String.class)); + } +} diff --git a/examples/template.yaml b/examples/template.yaml index 149ef9c62..2f5735c86 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -333,6 +333,17 @@ Resources: Handler: "software.amazon.lambda.durable.examples.general.CustomPollingExample" Role: !Ref RoleArn + PluginExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'plugin-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.general.PluginExample" + Role: !Ref RoleArn + RetryInvokeExampleFunction: Type: AWS::Serverless::Function Properties: @@ -531,6 +542,10 @@ Outputs: Description: Custom Polling Example Function ARN Value: !GetAtt CustomPollingExampleFunction.Arn + PluginExampleFunction: + Description: Plugin Example Function ARN + Value: !GetAtt PluginExampleFunction.Arn + RetryInvokeExampleFunction: Description: Retry Invoke Example Function ARN Value: !GetAtt RetryInvokeExampleFunction.Arn diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/PluginIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/PluginIntegrationTest.java new file mode 100644 index 000000000..f4ad9ffa8 --- /dev/null +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/PluginIntegrationTest.java @@ -0,0 +1,434 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import static org.junit.jupiter.api.Assertions.*; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.config.StepConfig; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.model.WaitForConditionResult; +import software.amazon.lambda.durable.plugin.*; +import software.amazon.lambda.durable.retry.RetryStrategies; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +/** Integration tests verifying plugin hooks fire correctly during durable execution lifecycle. */ +class PluginIntegrationTest { + + // ─── Invocation-level hooks ────────────────────────────────────────── + + @Test + void plugin_receivesInvocationStartAndEnd_onSuccessfulExecution() { + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.step("greet", String.class, stepCtx -> "Hello " + input), + config); + + var result = runner.runUntilComplete("World"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + // Verify invocation hooks fired + assertEquals(1, plugin.invocationStarts.size()); + assertTrue(plugin.invocationStarts.get(0).isFirstInvocation()); + assertNotNull(plugin.invocationStarts.get(0).executionArn()); + + assertEquals(1, plugin.invocationEnds.size()); + assertEquals(InvocationStatus.SUCCEEDED, plugin.invocationEnds.get(0).invocationStatus()); + assertNull(plugin.invocationEnds.get(0).executionError()); + } + + @Test + void plugin_receivesInvocationEnd_withPendingStatus_onSuspension() { + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> { + context.step("step1", String.class, stepCtx -> "done"); + context.wait("pause", Duration.ofMinutes(5)); + return "complete"; + }, + config); + + var result = runner.run("input"); + + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + assertEquals(1, plugin.invocationEnds.size()); + assertEquals(InvocationStatus.PENDING, plugin.invocationEnds.get(0).invocationStatus()); + } + + @Test + void plugin_receivesInvocationEnd_withFailedStatus_onError() { + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.step( + "failing", + String.class, + stepCtx -> { + throw new RuntimeException("boom"); + }, + StepConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build()), + config); + + var result = runner.run("input"); + + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + + assertEquals(1, plugin.invocationEnds.size()); + assertEquals(InvocationStatus.FAILED, plugin.invocationEnds.get(0).invocationStatus()); + assertNotNull(plugin.invocationEnds.get(0).executionError()); + } + + // ─── Operation-level hooks ─────────────────────────────────────────── + + @Test + void plugin_receivesOperationStartAndEnd_forStep() { + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, (input, context) -> context.step("my-step", String.class, stepCtx -> "result"), config); + + runner.runUntilComplete("input"); + + // Should have at least one operation start for the step + assertTrue( + plugin.operationStarts.stream().anyMatch(info -> "my-step".equals(info.name())), + "Should have onOperationStart for 'my-step'"); + + // Should have operation end for completed step + assertTrue( + plugin.operationEnds.stream().anyMatch(info -> "my-step".equals(info.name())), + "Should have onOperationEnd for 'my-step'"); + } + + @Test + void plugin_receivesOperationStart_forMultipleSteps() { + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> { + context.step("step-a", String.class, stepCtx -> "A"); + context.step("step-b", String.class, stepCtx -> "B"); + context.step("step-c", String.class, stepCtx -> "C"); + return "done"; + }, + config); + + runner.runUntilComplete("input"); + + var opNames = plugin.operationStarts.stream().map(OperationInfo::name).toList(); + assertTrue(opNames.contains("step-a")); + assertTrue(opNames.contains("step-b")); + assertTrue(opNames.contains("step-c")); + } + + @Test + void plugin_operationEnd_notFiredOnReplay() { + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> { + context.step("step1", String.class, stepCtx -> "done"); + context.wait("pause", Duration.ofMinutes(1)); + context.step("step2", String.class, stepCtx -> "final"); + return "complete"; + }, + config); + + // First invocation: step1 completes, then suspends at wait + var result1 = runner.run("input"); + assertEquals(ExecutionStatus.PENDING, result1.getStatus()); + + int operationEndsAfterFirstRun = plugin.operationEnds.size(); + assertTrue(operationEndsAfterFirstRun > 0, "At least step1 should have ended"); + + // Advance time and re-run (replay step1, execute step2) + runner.advanceTime(); + var result2 = runner.run("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + + // operationEnd for step1 should NOT fire again on replay + long step1EndCount = plugin.operationEnds.stream() + .filter(info -> "step1".equals(info.name())) + .count(); + assertEquals(1, step1EndCount, "step1 onOperationEnd should fire only once (not on replay)"); + } + + // ─── User function hooks ───────────────────────────────────────────── + + @Test + void plugin_receivesUserFunctionStartAndEnd_forStep() { + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, (input, context) -> context.step("compute", String.class, stepCtx -> "42"), config); + + runner.runUntilComplete("input"); + + assertTrue( + plugin.userFunctionStarts.stream().anyMatch(info -> "compute".equals(info.name())), + "Should have onUserFunctionStart for 'compute'"); + assertTrue( + plugin.userFunctionEnds.stream().anyMatch(info -> "compute".equals(info.name()) && info.succeeded()), + "Should have successful onUserFunctionEnd for 'compute'"); + } + + @Test + void plugin_userFunctionEnd_succeeded_false_whenStepFails() { + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + // When a step fails and retries are exhausted, the step operation's handleStepFailure + // sends a FAIL checkpoint. However, from runUserHandler's perspective the wrapped lambda + // catches the exception internally, so onUserFunctionEnd still reports succeeded=true + // for the step handler wrapper. The actual failure is captured in onOperationEnd and onInvocationEnd. + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.step( + "fail-step", + String.class, + stepCtx -> { + throw new RuntimeException("step failed"); + }, + StepConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build()), + config); + + var result = runner.run("input"); + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + + // Verify failure is captured at invocation level + assertEquals(1, plugin.invocationEnds.size()); + assertEquals(InvocationStatus.FAILED, plugin.invocationEnds.get(0).invocationStatus()); + + // The user function start/end hooks fire for the step execution thread + assertFalse(plugin.userFunctionStarts.isEmpty(), "Should have user function start for the step"); + assertFalse(plugin.userFunctionEnds.isEmpty(), "Should have user function end for the step"); + } + + @Test + void plugin_userFunctionStart_includesAttemptNumber_forRetries() { + var attemptCounter = new AtomicInteger(0); + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.step( + "retry-step", + String.class, + stepCtx -> { + if (attemptCounter.incrementAndGet() < 3) { + throw new RuntimeException("not yet"); + } + return "success"; + }, + StepConfig.builder() + .retryStrategy(RetryStrategies.Presets.DEFAULT) + .build()), + config); + + // Run until all retries succeed + runner.runUntilComplete("input"); + + // Check that attempt numbers are set on user function starts for the retry step + var retryStepStarts = plugin.userFunctionStarts.stream() + .filter(info -> "retry-step".equals(info.name()) && info.attempt() != null) + .toList(); + assertFalse(retryStepStarts.isEmpty(), "Should have user function starts with attempt numbers"); + assertEquals(Integer.valueOf(1), retryStepStarts.get(0).attempt()); + } + + // ─── Multiple plugins ──────────────────────────────────────────────── + + @Test + void multiplePlugins_allReceiveHooks() { + var plugin1 = new RecordingPlugin(); + var plugin2 = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin1, plugin2).build(); + + var runner = LocalDurableTestRunner.create( + String.class, (input, context) -> context.step("step", String.class, stepCtx -> "result"), config); + + runner.runUntilComplete("input"); + + // Both plugins should receive invocation hooks + assertEquals(1, plugin1.invocationStarts.size()); + assertEquals(1, plugin2.invocationStarts.size()); + assertEquals(1, plugin1.invocationEnds.size()); + assertEquals(1, plugin2.invocationEnds.size()); + } + + @Test + void throwingPlugin_doesNotDisruptExecution() { + var throwingPlugin = new ThrowingPlugin(); + var recordingPlugin = new RecordingPlugin(); + var config = DurableConfig.builder() + .withPlugins(throwingPlugin, recordingPlugin) + .build(); + + var runner = LocalDurableTestRunner.create( + String.class, (input, context) -> context.step("step", String.class, stepCtx -> "safe"), config); + + var result = runner.runUntilComplete("input"); + + // Execution should succeed despite throwing plugin + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("safe", result.getResult(String.class)); + + // The second plugin should still receive hooks + assertFalse(recordingPlugin.invocationStarts.isEmpty()); + assertFalse(recordingPlugin.invocationEnds.isEmpty()); + } + + // ─── Child context hooks ───────────────────────────────────────────── + + @Test + void plugin_receivesHooks_forChildContextOperations() { + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.runInChildContext("child", String.class, childCtx -> { + return childCtx.step("inner-step", String.class, stepCtx -> "from-child"); + }), + config); + + runner.runUntilComplete("input"); + + // Should have operation start for both child context and inner step + var opNames = plugin.operationStarts.stream().map(OperationInfo::name).toList(); + assertTrue(opNames.contains("child"), "Should track child context"); + assertTrue(opNames.contains("inner-step"), "Should track inner step"); + } + + // ─── WaitForCondition hooks ────────────────────────────────────────── + + @Test + void plugin_receivesAttemptNumbers_forWaitForCondition() { + var checkCount = new AtomicInteger(0); + var plugin = new RecordingPlugin(); + var config = DurableConfig.builder().withPlugins(plugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> { + var result = context.waitForCondition("poll", String.class, (state, stepCtx) -> { + int count = checkCount.incrementAndGet(); + if (count >= 2) { + return WaitForConditionResult.stopPolling("ready"); + } + return WaitForConditionResult.continuePolling("waiting-" + count); + }); + return result; + }, + config); + + runner.runUntilComplete("input"); + + // Should have user function starts with attempt numbers for the condition checks + var conditionStarts = plugin.userFunctionStarts.stream() + .filter(info -> "poll".equals(info.name()) && info.attempt() != null) + .toList(); + assertTrue(conditionStarts.size() >= 2, "Should have at least 2 condition check attempts"); + } + + // ─── Test helper classes ───────────────────────────────────────────── + + /** Plugin that records all hook invocations for assertions. */ + private static class RecordingPlugin implements DurableExecutionPlugin { + final List invocationStarts = Collections.synchronizedList(new ArrayList<>()); + final List invocationEnds = Collections.synchronizedList(new ArrayList<>()); + final List operationStarts = Collections.synchronizedList(new ArrayList<>()); + final List operationEnds = Collections.synchronizedList(new ArrayList<>()); + final List userFunctionStarts = Collections.synchronizedList(new ArrayList<>()); + final List userFunctionEnds = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void onInvocationStart(InvocationInfo info) { + invocationStarts.add(info); + } + + @Override + public void onInvocationEnd(InvocationEndInfo info) { + invocationEnds.add(info); + } + + @Override + public void onOperationStart(OperationInfo info) { + operationStarts.add(info); + } + + @Override + public void onOperationEnd(OperationEndInfo info) { + operationEnds.add(info); + } + + @Override + public void onUserFunctionStart(UserFunctionStartInfo info) { + userFunctionStarts.add(info); + } + + @Override + public void onUserFunctionEnd(UserFunctionEndInfo info) { + userFunctionEnds.add(info); + } + } + + /** Plugin that throws on every hook to verify error isolation. */ + private static class ThrowingPlugin implements DurableExecutionPlugin { + @Override + public void onInvocationStart(InvocationInfo info) { + throw new RuntimeException("plugin error"); + } + + @Override + public void onInvocationEnd(InvocationEndInfo info) { + throw new RuntimeException("plugin error"); + } + + @Override + public void onOperationStart(OperationInfo info) { + throw new RuntimeException("plugin error"); + } + + @Override + public void onOperationEnd(OperationEndInfo info) { + throw new RuntimeException("plugin error"); + } + + @Override + public void onUserFunctionStart(UserFunctionStartInfo info) { + throw new RuntimeException("plugin error"); + } + + @Override + public void onUserFunctionEnd(UserFunctionEndInfo info) { + throw new RuntimeException("plugin error"); + } + } +} diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java index 0e9814fbd..a5664ce1f 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java @@ -20,6 +20,7 @@ import software.amazon.lambda.durable.execution.DurableExecutor; import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.plugin.DurableExecutionPlugin; import software.amazon.lambda.durable.serde.SerDes; import software.amazon.lambda.durable.testing.local.LocalMemoryExecutionClient; import software.amazon.lambda.durable.testing.local.OperationResult; @@ -61,6 +62,7 @@ private LocalDurableTestRunner( .withPollingStrategy(customerConfig.getPollingStrategy()) .withCheckpointDelay(customerConfig.getCheckpointDelay()) .withLoggerConfig(customerConfig.getLoggerConfig()) + .withPlugins(customerConfig.getPluginRunner().getPlugins().toArray(new DurableExecutionPlugin[0])) .build(); } else { // Fallback to default config with in-memory client diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java index 19fe45d43..a21251e19 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java @@ -5,6 +5,8 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -21,6 +23,7 @@ import software.amazon.lambda.durable.client.DurableExecutionClient; import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient; import software.amazon.lambda.durable.logging.LoggerConfig; +import software.amazon.lambda.durable.plugin.DurableExecutionPlugin; import software.amazon.lambda.durable.plugin.PluginRunner; import software.amazon.lambda.durable.retry.PollingStrategies; import software.amazon.lambda.durable.retry.PollingStrategy; @@ -106,7 +109,7 @@ private DurableConfig(Builder builder) { this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults); this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT); this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0)); - this.pluginRunner = PluginRunner.noOp(); + this.pluginRunner = builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins); validateConfiguration(); } @@ -186,10 +189,9 @@ public Duration getCheckpointDelay() { /** * Gets the plugin runner that dispatches lifecycle events to registered plugins. * - *

Currently returns a no-op runner. Plugin registration via config will be added when the plugin system is fully - * wired. + *

Returns a no-op runner if no plugins were registered via the builder. * - * @return PluginRunner instance (always no-op until plugin wiring is complete) + * @return PluginRunner instance (never null) * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. */ @Deprecated @@ -291,6 +293,7 @@ public static final class Builder { private LoggerConfig loggerConfig; private PollingStrategy pollingStrategy; private Duration checkpointDelay; + private List plugins = new ArrayList<>(); public Builder() {} @@ -400,6 +403,30 @@ public Builder withCheckpointDelay(Duration duration) { return this; } + /** + * Registers one or more plugins for lifecycle event instrumentation. + * + *

Plugins receive hooks at invocation, operation, and user function boundaries. Errors thrown by plugins are + * isolated and never disrupt SDK execution. + * + *

Calling this method replaces any previously registered plugins. Plugins are called in registration order. + * + * @param plugins the plugins to register + * @return This builder + * @throws NullPointerException if any plugin is null + * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. + */ + @Deprecated + public Builder withPlugins(DurableExecutionPlugin... plugins) { + Objects.requireNonNull(plugins, "Plugins array cannot be null"); + var newPlugins = new ArrayList(plugins.length); + for (var plugin : plugins) { + newPlugins.add(Objects.requireNonNull(plugin, "Plugin cannot be null")); + } + this.plugins = newPlugins; + return this; + } + /** * Builds the DurableConfig instance. * diff --git a/sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java b/sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java index cdf837530..ded7ac0fa 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java @@ -24,6 +24,10 @@ import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.model.DurableExecutionOutput; +import software.amazon.lambda.durable.plugin.InvocationEndInfo; +import software.amazon.lambda.durable.plugin.InvocationInfo; +import software.amazon.lambda.durable.plugin.InvocationStatus; +import software.amazon.lambda.durable.plugin.PluginRunner; import software.amazon.lambda.durable.serde.SerDes; import software.amazon.lambda.durable.util.ExceptionHelper; @@ -48,11 +52,21 @@ public static DurableExecutionOutput execute( TypeToken inputType, BiFunction handler, DurableConfig config) { + var pluginRunner = config.getPluginRunner(); try (var executionManager = new ExecutionManager(input, config)) { + var isFirstInvocation = !executionManager.isReplaying(); + var requestId = lambdaContext != null ? lambdaContext.getAwsRequestId() : null; + var executionArn = input.durableExecutionArn(); + executionManager.registerActiveThread(null); var handlerFuture = CompletableFuture.supplyAsync( () -> { executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT)); + + // onInvocationStart runs on the user thread so plugins can + // inject ThreadLocal objects, update MDC, etc. + pluginRunner.onInvocationStart(new InvocationInfo(requestId, executionArn, isFirstInvocation)); + var userInput = extractUserInput( executionManager.getExecutionOperation(), config.getSerDes(), inputType); // use try-with-resources to clear logger properties @@ -76,6 +90,13 @@ public static DurableExecutionOutput execute( // return PENDING if it's SuspendExecutionException if (cause instanceof SuspendExecutionException) { + fireOnInvocationEnd( + pluginRunner, + requestId, + executionArn, + isFirstInvocation, + InvocationStatus.PENDING, + null); return DurableExecutionOutput.pending(); } @@ -85,17 +106,40 @@ public static DurableExecutionOutput execute( UnrecoverableDurableExecutionException unrecoverableDurableExecutionException && unrecoverableDurableExecutionException.isRetryable()) { + fireOnInvocationEnd( + pluginRunner, + requestId, + executionArn, + isFirstInvocation, + InvocationStatus.RETRYING, + cause); throw unrecoverableDurableExecutionException; } // fail the execution otherwise logger.debug("Execution failed: {}", cause.getMessage()); + fireOnInvocationEnd( + pluginRunner, + requestId, + executionArn, + isFirstInvocation, + InvocationStatus.FAILED, + cause); return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes())); } // user handler complete successfully logger.debug("Execution completed"); var outputPayload = config.getSerDes().serialize(result); - return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload)); + var output = + DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload)); + fireOnInvocationEnd( + pluginRunner, + requestId, + executionArn, + isFirstInvocation, + InvocationStatus.SUCCEEDED, + null); + return output; }) .join(); } catch (CompletionException e) { @@ -106,6 +150,16 @@ public static DurableExecutionOutput execute( } } + private static void fireOnInvocationEnd( + PluginRunner pluginRunner, + String requestId, + String executionArn, + boolean isFirstInvocation, + InvocationStatus status, + Throwable error) { + pluginRunner.onInvocationEnd(new InvocationEndInfo(requestId, executionArn, isFirstInvocation, status, error)); + } + private static String handleLargePayload(ExecutionManager executionManager, String outputPayload) { // Check if the serialized payload exceeds Lambda response size limit var payloadSize = outputPayload != null ? outputPayload.getBytes(StandardCharsets.UTF_8).length : 0; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java index b3dd55b95..a35ce1533 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java @@ -23,6 +23,8 @@ import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.model.OperationSubType; +import software.amazon.lambda.durable.plugin.PluginInfoConverter; +import software.amazon.lambda.durable.plugin.PluginRunner; import software.amazon.lambda.durable.util.ExceptionHelper; /** @@ -132,11 +134,15 @@ public void execute() { if (ExecutionManager.isTerminalStatus(existing.status())) { replayCompletedOperation.set(true); } + // Fire onOperationStart plugin hook (including replay) + fireOnOperationStart(existing); replay(existing); } else { if (durableContext.isReplaying()) { this.durableContext.setExecutionMode(); } + // Fire onOperationStart plugin hook (first execution, no existing operation) + fireOnOperationStart(null); start(); } } @@ -245,13 +251,41 @@ protected Operation waitForOperationCompletion() { } protected void runUserHandler(Runnable runnable, ThreadType threadType) { + runUserHandler(runnable, threadType, null); + } + + /** + * Runs user code in a separate thread with plugin hook instrumentation. + * + * @param runnable the user code to run + * @param threadType the thread type (STEP or CONTEXT) + * @param attempt the 1-based attempt number for steps/waitForCondition, null for context operations + */ + protected void runUserHandler(Runnable runnable, ThreadType threadType, Integer attempt) { String operationId = getOperationId(); logger.debug("Starting user handler for operation {} ({})", operationId, threadType); + var pluginRunner = getPluginRunner(); Runnable wrapped = () -> { executionManager.setCurrentThreadContext(new ThreadContext(operationId, threadType)); + + // Fire onUserFunctionStart on the user code thread + var userFunctionStartInfo = PluginInfoConverter.toUserFunctionStartInfo( + operationIdentifier, durableContext.getParentId(), durableContext.isReplaying(), attempt); + pluginRunner.onUserFunctionStart(userFunctionStartInfo); + try { runnable.run(); + // Fire onUserFunctionEnd on success + pluginRunner.onUserFunctionEnd( + PluginInfoConverter.toUserFunctionEndInfo(userFunctionStartInfo, true, null)); } catch (Throwable throwable) { + // Fire onUserFunctionEnd for actual user function failures, + // not for SDK control flow signals (SuspendExecutionException) + if (!(throwable instanceof SuspendExecutionException)) { + pluginRunner.onUserFunctionEnd( + PluginInfoConverter.toUserFunctionEndInfo(userFunctionStartInfo, false, throwable)); + } + // Operations always wrap the user's function and handles all possible exceptions except for // SuspendExecutionException. if (!executionManager.isExecutionCompletedExceptionally() @@ -310,6 +344,11 @@ public void onCheckpointComplete(Operation operation) { // handle other updates. logger.trace("In onCheckpointComplete, completing operation {} ({})", getOperationId(), completionFuture); + // Fire onOperationEnd plugin hook — operation reached terminal status for the first time (not replay) + if (!replayCompletedOperation.get()) { + fireOnOperationEnd(operation, null); + } + markCompletionFutureCompleted(); } } @@ -454,4 +493,25 @@ protected void validateReplay(Operation checkpointed) { public CompletableFuture getRunningUserHandler() { return runningUserHandler.get(); } + + // ─── Plugin hook helpers ───────────────────────────────────────────── + + /** Returns the plugin runner from config, or no-op if config is unavailable. */ + private PluginRunner getPluginRunner() { + var config = getContext().getDurableConfig(); + return config != null ? config.getPluginRunner() : PluginRunner.noOp(); + } + + /** Fires onOperationStart plugin hook. */ + private void fireOnOperationStart(Operation existing) { + var info = PluginInfoConverter.toOperationInfo(existing, operationIdentifier, durableContext.getParentId()); + getPluginRunner().onOperationStart(info); + } + + /** Fires onOperationEnd plugin hook when an operation reaches terminal status for the first time. */ + private void fireOnOperationEnd(Operation operation, Throwable error) { + var info = PluginInfoConverter.toOperationEndInfo( + operation, operationIdentifier, durableContext.getParentId(), error); + getPluginRunner().onOperationEnd(info); + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index 2ee7aa1a6..e8dce59bc 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -119,7 +119,7 @@ private void executeStepLogic(int attempt) { }; // Execute user provided step code in user-configured executor - runUserHandler(userHandler, ThreadType.STEP); + runUserHandler(userHandler, ThreadType.STEP, attempt); } private void checkpointStarted() { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java index 5ca9d7d66..a3471179c 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java @@ -160,7 +160,7 @@ private void executeCheckLogic(T currentState, int attempt) { } }; - runUserHandler(userHandler, ThreadType.STEP); + runUserHandler(userHandler, ThreadType.STEP, attempt); } private void handleCheckFailure(Throwable exception) { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/InvocationStatus.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/InvocationStatus.java index b47cba484..8a7f5176a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/plugin/InvocationStatus.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/InvocationStatus.java @@ -11,8 +11,10 @@ public enum InvocationStatus { /** Execution completed successfully in this invocation. */ SUCCEEDED, - /** Execution failed in this invocation. */ + /** Execution failed permanently in this invocation. */ FAILED, /** Execution suspended — will resume in a future invocation. */ - PENDING + PENDING, + /** Execution failed but will be retried by the backend in a new invocation. */ + RETRYING } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationEndInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationEndInfo.java index d765cee1d..dee364174 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationEndInfo.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationEndInfo.java @@ -7,13 +7,11 @@ /** * Extended operation information for operation end events. * - * @param id operation ID — hashed - * @param rawId operation ID — unhashed + * @param id operation ID * @param name human-readable operation name (may be null) * @param type operation type * @param subType operation sub-type (may be null) - * @param parentId parent operation ID — hashed (null for root-level operations) - * @param rawParentId parent operation ID — unhashed (null for root-level operations) + * @param parentId parent operation ID (null for root-level operations) * @param startTimestamp when the operation started * @param endTimestamp when the operation ended * @param error non-null if the operation failed @@ -22,12 +20,10 @@ @Deprecated public record OperationEndInfo( String id, - String rawId, String name, String type, String subType, String parentId, - String rawParentId, Instant startTimestamp, Instant endTimestamp, Throwable error) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationInfo.java index 3994fbf54..74c23d687 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationInfo.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationInfo.java @@ -9,25 +9,22 @@ * *

Field names mirror the {@code Operation} type from the AWS SDK for consistency. * - * @param id operation ID — hashed (unique within the execution) - * @param rawId operation ID — unhashed (e.g. "1", "2", "hash(1)-1" for child contexts) + * @param id operation ID (unique within the execution) * @param name human-readable operation name (may be null) * @param type operation type (STEP, WAIT, CONTEXT, CHAINED_INVOKE, CALLBACK) * @param subType operation sub-type (Map, Parallel, RunInChildContext, WaitForCondition, etc.) — may be null - * @param parentId parent operation ID — hashed (null for root-level operations) - * @param rawParentId parent operation ID — unhashed (null for root-level operations) - * @param startTimestamp when the operation started (may be from a prior invocation) + * @param parentId parent operation ID (null for root-level operations) + * @param startTimestamp when the operation started — on first execution this is a local {@code Instant.now()} which may + * slightly differ from the timestamp recorded by the backend; on replay it comes from the backend checkpoint * @param endTimestamp when the operation ended (null if still running) * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. */ @Deprecated public record OperationInfo( String id, - String rawId, String name, String type, String subType, String parentId, - String rawParentId, Instant startTimestamp, Instant endTimestamp) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginInfoConverter.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginInfoConverter.java index afd06c31f..6c7b30ff5 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginInfoConverter.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginInfoConverter.java @@ -4,8 +4,7 @@ import java.time.Instant; import software.amazon.awssdk.services.lambda.model.Operation; -import software.amazon.awssdk.services.lambda.model.OperationType; -import software.amazon.lambda.durable.model.OperationSubType; +import software.amazon.lambda.durable.model.OperationIdentifier; /** * Utility methods for converting SDK internal types to plugin info records. @@ -18,94 +17,42 @@ public final class PluginInfoConverter { private PluginInfoConverter() {} /** - * Converts an SDK {@link Operation} to an {@link OperationInfo} for plugin hooks. + * Converts an SDK {@link Operation} to an {@link OperationInfo} using an {@link OperationIdentifier}. * * @param operation the SDK operation (may be null for first-start scenarios) - * @param operationId the hashed operation ID - * @param rawId the unhashed operation ID (e.g. "1", "2", "1-1") - * @param name the operation name - * @param type the operation type - * @param subType the operation sub-type (may be null) - * @param parentId the hashed parent operation ID (may be null for root operations) - * @param rawParentId the unhashed parent ID (may be null for root operations) + * @param identifier the operation identifier containing id, name, type, and subType + * @param parentId the parent operation ID (may be null for root operations) * @return an OperationInfo record */ - public static OperationInfo toOperationInfo( - Operation operation, - String operationId, - String rawId, - String name, - OperationType type, - OperationSubType subType, - String parentId, - String rawParentId) { + public static OperationInfo toOperationInfo(Operation operation, OperationIdentifier identifier, String parentId) { return new OperationInfo( - operationId, - rawId, - name, - type != null ? type.toString() : null, - subType != null ? subType.getValue() : null, + identifier.operationId(), + identifier.name(), + identifier.operationType() != null ? identifier.operationType().toString() : null, + identifier.subType() != null ? identifier.subType().getValue() : null, parentId, - rawParentId, - operation != null ? operation.startTimestamp() : null, + operation != null ? operation.startTimestamp() : Instant.now(), operation != null ? operation.endTimestamp() : null); } /** - * Converts an SDK {@link Operation} to an {@link OperationInfo} using the operation's own fields. Raw IDs are not - * available from the Operation object alone, so they are set to null. - * - * @param operation the SDK operation - * @return an OperationInfo record - */ - public static OperationInfo toOperationInfo(Operation operation) { - if (operation == null) { - return new OperationInfo(null, null, null, null, null, null, null, null, null); - } - return new OperationInfo( - operation.id(), - null, - operation.name(), - operation.type() != null ? operation.type().toString() : null, - operation.subType(), - operation.parentId(), - null, - operation.startTimestamp(), - operation.endTimestamp()); - } - - /** - * Creates an {@link OperationEndInfo} from an SDK {@link Operation} and an optional error. + * Creates an {@link OperationEndInfo} from an SDK {@link Operation}, an {@link OperationIdentifier}, and an + * optional error. * * @param operation the completed SDK operation - * @param operationId the hashed operation ID - * @param rawId the unhashed operation ID - * @param name the operation name - * @param type the operation type - * @param subType the operation sub-type (may be null) - * @param parentId the hashed parent operation ID (may be null) - * @param rawParentId the unhashed parent ID (may be null) + * @param identifier the operation identifier containing id, name, type, and subType + * @param parentId the parent operation ID (may be null) * @param error the error if the operation failed (may be null) * @return an OperationEndInfo record */ public static OperationEndInfo toOperationEndInfo( - Operation operation, - String operationId, - String rawId, - String name, - OperationType type, - OperationSubType subType, - String parentId, - String rawParentId, - Throwable error) { + Operation operation, OperationIdentifier identifier, String parentId, Throwable error) { return new OperationEndInfo( - operationId, - rawId, - name, - type != null ? type.toString() : null, - subType != null ? subType.getValue() : null, + identifier.operationId(), + identifier.name(), + identifier.operationType() != null ? identifier.operationType().toString() : null, + identifier.subType() != null ? identifier.subType().getValue() : null, parentId, - rawParentId, operation != null ? operation.startTimestamp() : null, operation != null ? operation.endTimestamp() : null, error); @@ -114,37 +61,22 @@ public static OperationEndInfo toOperationEndInfo( /** * Creates a {@link UserFunctionStartInfo} for when a user function starts executing. * - * @param operationId the hashed operation ID - * @param rawId the unhashed operation ID - * @param name the operation name - * @param type the operation type - * @param subType the operation sub-type (may be null) - * @param parentId the hashed parent operation ID (may be null) - * @param rawParentId the unhashed parent ID (may be null) + * @param identifier the operation identifier containing id, name, type, and subType + * @param parentId the parent operation ID (may be null) * @param isReplay true if the user function is called during replay (context operations) * @param attempt the 1-based attempt number (null for context operations) * @return a UserFunctionStartInfo record */ public static UserFunctionStartInfo toUserFunctionStartInfo( - String operationId, - String rawId, - String name, - OperationType type, - OperationSubType subType, - String parentId, - String rawParentId, - boolean isReplay, - Integer attempt) { + OperationIdentifier identifier, String parentId, boolean isReplayingChildren, Integer attempt) { return new UserFunctionStartInfo( - operationId, - rawId, - name, - type != null ? type.toString() : null, - subType != null ? subType.getValue() : null, + identifier.operationId(), + identifier.name(), + identifier.operationType() != null ? identifier.operationType().toString() : null, + identifier.subType() != null ? identifier.subType().getValue() : null, parentId, - rawParentId, Instant.now(), - isReplay, + isReplayingChildren, attempt); } @@ -160,15 +92,13 @@ public static UserFunctionEndInfo toUserFunctionEndInfo( UserFunctionStartInfo startInfo, boolean succeeded, Throwable error) { return new UserFunctionEndInfo( startInfo.id(), - startInfo.rawId(), startInfo.name(), startInfo.type(), startInfo.subType(), startInfo.parentId(), - startInfo.rawParentId(), startInfo.startTimestamp(), Instant.now(), - startInfo.isReplay(), + startInfo.isReplayingChildren(), startInfo.attempt(), succeeded, error); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginRunner.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginRunner.java index 46a92f35b..4620fd062 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginRunner.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginRunner.java @@ -40,6 +40,11 @@ public boolean isEmpty() { return plugins.isEmpty(); } + /** Returns the list of registered plugins. */ + public List getPlugins() { + return plugins; + } + // ─── Event hooks ───────────────────────────────────────────────────── /** Calls a void hook on all plugins, swallowing any errors. */ diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/UserFunctionEndInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/UserFunctionEndInfo.java index 96a365de7..86136b3bd 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/plugin/UserFunctionEndInfo.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/UserFunctionEndInfo.java @@ -9,16 +9,14 @@ * *

This fires for both step attempts and child context functions, on the same thread as the user code. * - * @param id operation ID — hashed - * @param rawId operation ID — unhashed + * @param id operation ID * @param name human-readable operation name (may be null) * @param type operation type (STEP, CONTEXT, etc.) * @param subType operation sub-type (Map, Parallel, WaitForCondition, etc.) — may be null - * @param parentId parent operation ID — hashed (null for root-level operations) - * @param rawParentId parent operation ID — unhashed (null for root-level operations) + * @param parentId parent operation ID (null for root-level operations) * @param startTimestamp when the user function started * @param endTimestamp when the user function ended - * @param isReplay true if the user function was called during replay (context operations) + * @param isReplayingChildren true if child operations within this context are being replayed from checkpoints * @param attempt 1-based attempt number for steps/waitForCondition, null for context operations * @param succeeded true if the user function completed without error * @param error non-null if the user function failed @@ -27,15 +25,13 @@ @Deprecated public record UserFunctionEndInfo( String id, - String rawId, String name, String type, String subType, String parentId, - String rawParentId, Instant startTimestamp, Instant endTimestamp, - boolean isReplay, + boolean isReplayingChildren, Integer attempt, boolean succeeded, Throwable error) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/UserFunctionStartInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/UserFunctionStartInfo.java index 207b9d8a4..842f82536 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/plugin/UserFunctionStartInfo.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/UserFunctionStartInfo.java @@ -10,27 +10,23 @@ *

This fires for both step attempts and child context functions, on the same thread as the user code. For steps, * {@code attempt} is non-null (1-based). For context operations, {@code attempt} is null. * - * @param id operation ID — hashed - * @param rawId operation ID — unhashed + * @param id operation ID * @param name human-readable operation name (may be null) * @param type operation type (STEP, CONTEXT, etc.) * @param subType operation sub-type (Map, Parallel, WaitForCondition, etc.) — may be null - * @param parentId parent operation ID — hashed (null for root-level operations) - * @param rawParentId parent operation ID — unhashed (null for root-level operations) + * @param parentId parent operation ID (null for root-level operations) * @param startTimestamp when the user function started - * @param isReplay true if the user function is being called during replay (context operations) + * @param isReplayingChildren true if child operations within this context are being replayed from checkpoints * @param attempt 1-based attempt number for steps/waitForCondition, null for context operations * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. */ @Deprecated public record UserFunctionStartInfo( String id, - String rawId, String name, String type, String subType, String parentId, - String rawParentId, Instant startTimestamp, - boolean isReplay, + boolean isReplayingChildren, Integer attempt) {} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java index 6803f3e03..94d13f5ef 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java @@ -13,6 +13,8 @@ import static org.mockito.Mockito.mock; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -20,6 +22,7 @@ import software.amazon.awssdk.services.lambda.LambdaClient; import software.amazon.lambda.durable.client.DurableExecutionClient; import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient; +import software.amazon.lambda.durable.plugin.DurableExecutionPlugin; import software.amazon.lambda.durable.retry.JitterStrategy; import software.amazon.lambda.durable.retry.PollingStrategies; import software.amazon.lambda.durable.serde.JacksonSerDes; @@ -461,4 +464,111 @@ private static void setField(Object target, String fieldName, Object value) thro field.setAccessible(true); field.set(target, value); } + + // --- Plugin registration tests --- + + @Test + void testDefaultConfig_PluginRunnerIsNoOp() { + var config = DurableConfig.defaultConfig(); + + assertNotNull(config.getPluginRunner()); + assertTrue(config.getPluginRunner().isEmpty()); + } + + @Test + void testBuilder_NoPlugins_PluginRunnerIsNoOp() { + var config = + DurableConfig.builder().withDurableExecutionClient(mockClient).build(); + + assertNotNull(config.getPluginRunner()); + assertTrue(config.getPluginRunner().isEmpty()); + } + + @Test + void testBuilder_WithPlugin_CreatesActivePluginRunner() { + var plugin = new DurableExecutionPlugin() {}; + var config = DurableConfig.builder() + .withDurableExecutionClient(mockClient) + .withPlugins(plugin) + .build(); + + assertNotNull(config.getPluginRunner()); + assertFalse(config.getPluginRunner().isEmpty()); + } + + @Test + void testBuilder_WithMultiplePlugins_AllRegistered() { + var calls = new ArrayList(); + var plugin1 = new TestPlugin("p1", calls); + var plugin2 = new TestPlugin("p2", calls); + var config = DurableConfig.builder() + .withDurableExecutionClient(mockClient) + .withPlugins(plugin1, plugin2) + .build(); + + config.getPluginRunner() + .onInvocationStart(new software.amazon.lambda.durable.plugin.InvocationInfo("req-1", "arn:test", true)); + + assertEquals(List.of("p1:onInvocationStart", "p2:onInvocationStart"), calls); + } + + @Test + void testBuilder_WithPlugins_CalledMultipleTimes_Replaces() { + var calls = new ArrayList(); + var plugin1 = new TestPlugin("p1", calls); + var plugin2 = new TestPlugin("p2", calls); + var plugin3 = new TestPlugin("p3", calls); + + var config = DurableConfig.builder() + .withDurableExecutionClient(mockClient) + .withPlugins(plugin1) + .withPlugins(plugin2, plugin3) + .build(); + + config.getPluginRunner() + .onInvocationStart(new software.amazon.lambda.durable.plugin.InvocationInfo("req-1", "arn:test", true)); + + assertEquals(List.of("p2:onInvocationStart", "p3:onInvocationStart"), calls); + } + + @Test + void testBuilder_WithPlugins_NullArrayThrows() { + var builder = DurableConfig.builder(); + + var ex = assertThrows(NullPointerException.class, () -> builder.withPlugins((DurableExecutionPlugin[]) null)); + assertEquals("Plugins array cannot be null", ex.getMessage()); + } + + @Test + void testBuilder_WithPlugins_NullElementThrows() { + var builder = DurableConfig.builder(); + + var ex = assertThrows( + NullPointerException.class, () -> builder.withPlugins(new DurableExecutionPlugin[] {null})); + assertEquals("Plugin cannot be null", ex.getMessage()); + } + + @Test + void testBuilder_WithPlugins_FluentAPI() { + var builder = DurableConfig.builder(); + var plugin = new DurableExecutionPlugin() {}; + + assertSame(builder, builder.withPlugins(plugin)); + } + + /** Simple test plugin that records hook calls. */ + private static class TestPlugin implements DurableExecutionPlugin { + private final String name; + private final List calls; + + TestPlugin(String name, List calls) { + this.name = name; + this.calls = calls; + } + + @Override + public void onInvocationStart(software.amazon.lambda.durable.plugin.InvocationInfo info) { + calls.add(name + ":onInvocationStart"); + } + } } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginInfoConverterTest.java b/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginInfoConverterTest.java index d2462cf28..dc705835b 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginInfoConverterTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginInfoConverterTest.java @@ -7,136 +7,59 @@ import java.time.Instant; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.lambda.model.Operation; -import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.model.OperationSubType; class PluginInfoConverterTest { private static final String OPERATION_ID = "op-1"; - private static final String RAW_ID = "1"; private static final String OPERATION_NAME = "validate-order"; private static final String PARENT_ID = "parent-ctx"; - private static final String RAW_PARENT_ID = "0"; private static final Instant START = Instant.parse("2026-01-01T00:00:00Z"); private static final Instant END = Instant.parse("2026-01-01T00:00:05Z"); - // ─── toOperationInfo (from Operation) ──────────────────────────────── + private static final OperationIdentifier STEP_IDENTIFIER = + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.STEP); + private static final OperationIdentifier WAIT_IDENTIFIER = + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.WAIT); + private static final OperationIdentifier WAIT_FOR_CONDITION_IDENTIFIER = + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.WAIT_FOR_CONDITION); + private static final OperationIdentifier MAP_IDENTIFIER = + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.MAP); - @Test - void toOperationInfo_fromOperation_mapsAllFields() { - var operation = Operation.builder() - .id(OPERATION_ID) - .name(OPERATION_NAME) - .type(OperationType.STEP) - .subType("WaitForCondition") - .parentId(PARENT_ID) - .status(OperationStatus.SUCCEEDED) - .startTimestamp(START) - .endTimestamp(END) - .build(); - - var info = PluginInfoConverter.toOperationInfo(operation); - - assertEquals(OPERATION_ID, info.id()); - assertNull(info.rawId()); - assertEquals(OPERATION_NAME, info.name()); - assertEquals("STEP", info.type()); - assertEquals("WaitForCondition", info.subType()); - assertEquals(PARENT_ID, info.parentId()); - assertNull(info.rawParentId()); - assertEquals(START, info.startTimestamp()); - assertEquals(END, info.endTimestamp()); - } - - @Test - void toOperationInfo_fromNullOperation_returnsAllNulls() { - var info = PluginInfoConverter.toOperationInfo(null); - - assertNull(info.id()); - assertNull(info.rawId()); - assertNull(info.name()); - assertNull(info.type()); - assertNull(info.subType()); - assertNull(info.parentId()); - assertNull(info.rawParentId()); - assertNull(info.startTimestamp()); - assertNull(info.endTimestamp()); - } - - @Test - void toOperationInfo_fromOperation_handlesNullFields() { - var operation = Operation.builder() - .id(OPERATION_ID) - .status(OperationStatus.STARTED) - .startTimestamp(START) - .build(); - - var info = PluginInfoConverter.toOperationInfo(operation); - - assertEquals(OPERATION_ID, info.id()); - assertNull(info.rawId()); - assertNull(info.name()); - assertNull(info.type()); - assertNull(info.subType()); - assertNull(info.parentId()); - assertNull(info.rawParentId()); - assertEquals(START, info.startTimestamp()); - assertNull(info.endTimestamp()); - } - - // ─── toOperationInfo (from explicit params) ────────────────────────── + // ─── toOperationInfo ───────────────────────────────────────────────── @Test - void toOperationInfo_fromParams_mapsAllFields() { + void toOperationInfo_withIdentifier_mapsAllFields() { var operation = Operation.builder().startTimestamp(START).endTimestamp(END).build(); - var info = PluginInfoConverter.toOperationInfo( - operation, - OPERATION_ID, - RAW_ID, - OPERATION_NAME, - OperationType.STEP, - OperationSubType.WAIT_FOR_CONDITION, - PARENT_ID, - RAW_PARENT_ID); + var info = PluginInfoConverter.toOperationInfo(operation, WAIT_FOR_CONDITION_IDENTIFIER, PARENT_ID); assertEquals(OPERATION_ID, info.id()); - assertEquals(RAW_ID, info.rawId()); assertEquals(OPERATION_NAME, info.name()); assertEquals("STEP", info.type()); assertEquals("WaitForCondition", info.subType()); assertEquals(PARENT_ID, info.parentId()); - assertEquals(RAW_PARENT_ID, info.rawParentId()); assertEquals(START, info.startTimestamp()); assertEquals(END, info.endTimestamp()); } @Test - void toOperationInfo_fromParams_nullOperation_noTimestamps() { - var info = PluginInfoConverter.toOperationInfo( - null, OPERATION_ID, RAW_ID, OPERATION_NAME, OperationType.WAIT, OperationSubType.WAIT, null, null); + void toOperationInfo_withIdentifier_nullOperation_usesCurrentTime() { + var before = Instant.now(); + var info = PluginInfoConverter.toOperationInfo(null, WAIT_IDENTIFIER, null); assertEquals(OPERATION_ID, info.id()); - assertEquals(RAW_ID, info.rawId()); assertEquals(OPERATION_NAME, info.name()); assertEquals("WAIT", info.type()); assertEquals("Wait", info.subType()); assertNull(info.parentId()); - assertNull(info.rawParentId()); - assertNull(info.startTimestamp()); + assertNotNull(info.startTimestamp()); + assertFalse(info.startTimestamp().isBefore(before)); assertNull(info.endTimestamp()); } - @Test - void toOperationInfo_fromParams_nullSubType() { - var info = PluginInfoConverter.toOperationInfo( - null, OPERATION_ID, RAW_ID, OPERATION_NAME, OperationType.STEP, null, null, null); - - assertNull(info.subType()); - } - // ─── toOperationEndInfo ────────────────────────────────────────────── @Test @@ -145,24 +68,13 @@ void toOperationEndInfo_mapsAllFields() { Operation.builder().startTimestamp(START).endTimestamp(END).build(); var error = new RuntimeException("step failed"); - var info = PluginInfoConverter.toOperationEndInfo( - operation, - OPERATION_ID, - RAW_ID, - OPERATION_NAME, - OperationType.STEP, - OperationSubType.STEP, - PARENT_ID, - RAW_PARENT_ID, - error); + var info = PluginInfoConverter.toOperationEndInfo(operation, STEP_IDENTIFIER, PARENT_ID, error); assertEquals(OPERATION_ID, info.id()); - assertEquals(RAW_ID, info.rawId()); assertEquals(OPERATION_NAME, info.name()); assertEquals("STEP", info.type()); assertEquals("Step", info.subType()); assertEquals(PARENT_ID, info.parentId()); - assertEquals(RAW_PARENT_ID, info.rawParentId()); assertEquals(START, info.startTimestamp()); assertEquals(END, info.endTimestamp()); assertEquals(error, info.error()); @@ -173,16 +85,7 @@ void toOperationEndInfo_nullError_forSuccess() { var operation = Operation.builder().startTimestamp(START).endTimestamp(END).build(); - var info = PluginInfoConverter.toOperationEndInfo( - operation, - OPERATION_ID, - RAW_ID, - OPERATION_NAME, - OperationType.STEP, - OperationSubType.STEP, - null, - null, - null); + var info = PluginInfoConverter.toOperationEndInfo(operation, STEP_IDENTIFIER, null, null); assertNull(info.error()); } @@ -191,45 +94,25 @@ void toOperationEndInfo_nullError_forSuccess() { @Test void toUserFunctionStartInfo_stepAttempt() { - var info = PluginInfoConverter.toUserFunctionStartInfo( - OPERATION_ID, - RAW_ID, - OPERATION_NAME, - OperationType.STEP, - OperationSubType.STEP, - PARENT_ID, - RAW_PARENT_ID, - false, - 3); + var info = PluginInfoConverter.toUserFunctionStartInfo(STEP_IDENTIFIER, PARENT_ID, false, 3); assertEquals(OPERATION_ID, info.id()); - assertEquals(RAW_ID, info.rawId()); assertEquals(OPERATION_NAME, info.name()); assertEquals("STEP", info.type()); assertEquals("Step", info.subType()); assertEquals(PARENT_ID, info.parentId()); - assertEquals(RAW_PARENT_ID, info.rawParentId()); assertNotNull(info.startTimestamp()); - assertFalse(info.isReplay()); + assertFalse(info.isReplayingChildren()); assertEquals(3, info.attempt()); } @Test void toUserFunctionStartInfo_contextOperation() { - var info = PluginInfoConverter.toUserFunctionStartInfo( - OPERATION_ID, - RAW_ID, - OPERATION_NAME, - OperationType.CONTEXT, - OperationSubType.MAP, - PARENT_ID, - RAW_PARENT_ID, - true, - null); + var info = PluginInfoConverter.toUserFunctionStartInfo(MAP_IDENTIFIER, PARENT_ID, true, null); assertEquals("CONTEXT", info.type()); assertEquals("Map", info.subType()); - assertTrue(info.isReplay()); + assertTrue(info.isReplayingChildren()); assertNull(info.attempt()); } @@ -237,25 +120,15 @@ void toUserFunctionStartInfo_contextOperation() { @Test void toUserFunctionEndInfo_succeeded() { - var startInfo = PluginInfoConverter.toUserFunctionStartInfo( - OPERATION_ID, - RAW_ID, - OPERATION_NAME, - OperationType.STEP, - OperationSubType.STEP, - PARENT_ID, - RAW_PARENT_ID, - false, - 1); + var startInfo = PluginInfoConverter.toUserFunctionStartInfo(STEP_IDENTIFIER, PARENT_ID, false, 1); var endInfo = PluginInfoConverter.toUserFunctionEndInfo(startInfo, true, null); assertEquals(OPERATION_ID, endInfo.id()); - assertEquals(RAW_ID, endInfo.rawId()); assertEquals(OPERATION_NAME, endInfo.name()); assertEquals(startInfo.startTimestamp(), endInfo.startTimestamp()); assertNotNull(endInfo.endTimestamp()); - assertFalse(endInfo.isReplay()); + assertFalse(endInfo.isReplayingChildren()); assertEquals(1, endInfo.attempt()); assertTrue(endInfo.succeeded()); assertNull(endInfo.error()); @@ -264,8 +137,7 @@ void toUserFunctionEndInfo_succeeded() { @Test void toUserFunctionEndInfo_failed() { var error = new RuntimeException("step failed"); - var startInfo = PluginInfoConverter.toUserFunctionStartInfo( - OPERATION_ID, RAW_ID, OPERATION_NAME, OperationType.STEP, null, null, null, false, 2); + var startInfo = PluginInfoConverter.toUserFunctionStartInfo(STEP_IDENTIFIER, null, false, 2); var endInfo = PluginInfoConverter.toUserFunctionEndInfo(startInfo, false, error); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginRunnerTest.java b/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginRunnerTest.java index 0180b73cb..991af3265 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginRunnerTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginRunnerTest.java @@ -142,21 +142,20 @@ private static InvocationEndInfo invocationEndInfo() { } private static OperationInfo operationInfo() { - return new OperationInfo("op-1", "1", "test-step", "STEP", null, null, null, Instant.now(), null); + return new OperationInfo("op-1", "test-step", "STEP", null, null, Instant.now(), null); } private static OperationEndInfo operationEndInfo() { - return new OperationEndInfo( - "op-1", "1", "test-step", "STEP", null, null, null, Instant.now(), Instant.now(), null); + return new OperationEndInfo("op-1", "test-step", "STEP", null, null, Instant.now(), Instant.now(), null); } private static UserFunctionStartInfo attemptInfo() { - return new UserFunctionStartInfo("op-1", "1", "test-step", "STEP", null, null, null, Instant.now(), false, 1); + return new UserFunctionStartInfo("op-1", "test-step", "STEP", null, null, Instant.now(), false, 1); } private static UserFunctionEndInfo attemptEndInfo() { return new UserFunctionEndInfo( - "op-1", "1", "test-step", "STEP", null, null, null, Instant.now(), Instant.now(), false, 1, true, null); + "op-1", "test-step", "STEP", null, null, Instant.now(), Instant.now(), false, 1, true, null); } // ─── Test plugin implementations ─────────────────────────────────────