From ed2944bb79bff1e52d93161f1a2b0a8f5125e6f7 Mon Sep 17 00:00:00 2001 From: nvasiu Date: Fri, 5 Jun 2026 18:36:05 +0000 Subject: [PATCH] fix(sdk): Deprecate StepConfig.semantics, add StepConfig.semanticsPerRetry with correct AT_MOST_ONCE behaviour --- docs/advanced/error-handling.md | 4 +- docs/core/steps.md | 12 +- docs/design.md | 2 +- .../general/ErrorHandlingExample.java | 5 +- .../durable/StepSemanticsIntegrationTest.java | 158 +++++++++++++++++- .../lambda/durable/config/StepConfig.java | 40 ++++- .../lambda/durable/config/StepSemantics.java | 11 +- .../durable/operation/StepOperation.java | 13 +- .../lambda/durable/config/StepConfigTest.java | 32 ++++ 9 files changed, 255 insertions(+), 22 deletions(-) diff --git a/docs/advanced/error-handling.md b/docs/advanced/error-handling.md index 51b194e8a..867ef2720 100644 --- a/docs/advanced/error-handling.md +++ b/docs/advanced/error-handling.md @@ -18,7 +18,7 @@ RuntimeException └── DurableOperationException - General operation exception ├── StepException - General Step exception │ ├── StepFailedException - Step exhausted all retry attempts. Catch to implement fallback logic or let execution fail. - │ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion. Implement manual recovery (check if operation completed externally) + │ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion and retries exhausted. Implement manual recovery (check if operation completed externally) ├── InvokeException - General chained invocation exception │ ├── InvokeFailedException - Chained invocation failed. Handle the error or propagate failure. │ ├── InvokeTimedOutException - Chained invocation timed out. Handle the error or propagate failure. @@ -38,7 +38,7 @@ try { var result = ctx.step("charge-payment", Payment.class, stepCtx -> paymentService.charge(amount), StepConfig.builder() - .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY) .build()); } catch (StepInterruptedException e) { // Step started but we don't know if it completed diff --git a/docs/core/steps.md b/docs/core/steps.md index 770f631bb..628134b04 100644 --- a/docs/core/steps.md +++ b/docs/core/steps.md @@ -11,7 +11,7 @@ var result = ctx.step("call-api", Response.class, stepCtx -> externalApi.call(request), StepConfig.builder() .retryStrategy(...) - .semantics(...) + .semanticsPerRetry(...) .build()); ``` @@ -42,7 +42,7 @@ Configure step behavior with `StepConfig`: ctx.step("my-step", Result.class, stepCtx -> doWork(), StepConfig.builder() .retryStrategy(...) // How to handle failures - .semantics(...) // At-least-once vs at-most-once + .semanticsPerRetry(...) // At-least-once vs at-most-once .serDes(...) // Custom serialization .build()); ``` @@ -73,7 +73,7 @@ Control how steps behave when interrupted mid-execution: | Semantic | Behavior | Use Case | |----------|----------|----------| | `AT_LEAST_ONCE_PER_RETRY` (default) | Re-executes step if interrupted before completion | Idempotent operations (database upserts, API calls with idempotency keys) | -| `AT_MOST_ONCE_PER_RETRY` | Never re-executes; throws `StepInterruptedException` if interrupted | Non-idempotent operations (sending emails, charging payments) | +| `AT_MOST_ONCE_PER_RETRY` | Re-executes step once per retry if interrupted. Throws `StepInterruptedException` if retries exhausted | Non-idempotent operations (sending emails, charging payments) | ```java // Default: at-least-once per retry (step may re-run if interrupted) @@ -84,14 +84,14 @@ var result = ctx.step("idempotent-update", Result.class, var result = ctx.step("send-email", Result.class, stepCtx -> emailService.send(notification), StepConfig.builder() - .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY) .build()); ``` **Important**: These semantics apply *per retry attempt*, not per overall execution: - **AT_LEAST_ONCE_PER_RETRY**: The step executes at least once per retry. If the step succeeds but checkpointing fails (e.g., sandbox crash), the step re-executes on replay. -- **AT_MOST_ONCE_PER_RETRY**: A checkpoint is created before execution. If failure occurs after checkpoint but before completion, the step is skipped on replay and `StepInterruptedException` is thrown. +- **AT_MOST_ONCE_PER_RETRY**: A checkpoint is created before execution. If failure occurs after checkpoint but before completion, the step is re-executed on a new retry attempt. `StepInterruptedException` is thrown if retries are exhausted. To achieve step-level at-most-once semantics, combine with a no-retry strategy: @@ -100,7 +100,7 @@ To achieve step-level at-most-once semantics, combine with a no-retry strategy: var result = ctx.step("charge-payment", Result.class, stepCtx -> paymentService.charge(amount), StepConfig.builder() - .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY) .retryStrategy(RetryStrategies.Presets.NO_RETRY) .build()); ``` diff --git a/docs/design.md b/docs/design.md index 69d6b79de..0960c5e74 100644 --- a/docs/design.md +++ b/docs/design.md @@ -199,7 +199,7 @@ context.step("name", Type.class, stepCtx -> doWork(), StepConfig.builder() .serDes(stepSpecificSerDes) .retryStrategy(RetryStrategies.exponentialBackoff(3, Duration.ofSeconds(1))) - .semantics(AT_MOST_ONCE_PER_RETRY) + .semanticsPerRetry(AT_MOST_ONCE_PER_RETRY) .build()); ``` diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/general/ErrorHandlingExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/general/ErrorHandlingExample.java index 4c0656122..ecd70cf99 100644 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/general/ErrorHandlingExample.java +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/general/ErrorHandlingExample.java @@ -73,7 +73,7 @@ public String handleRequest(Object input, DurableContext context) { // Example 2: Handling StepInterruptedException for AT_MOST_ONCE operations // StepInterruptedException is thrown when an AT_MOST_ONCE step was started - // but the function was interrupted before the step completed. + // but the function was interrupted before the step completed on every attempt. // In normal execution, this step succeeds. The catch block handles the // interruption scenario that occurs during replay after an unexpected termination. String paymentResult; @@ -83,7 +83,8 @@ public String handleRequest(Object input, DurableContext context) { String.class, stepCtx -> "payment-" + input, StepConfig.builder() - .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .retryStrategy(RetryStrategies.Presets.NO_RETRY) .build()); } catch (StepInterruptedException e) { logger.warn( diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/StepSemanticsIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/StepSemanticsIntegrationTest.java index 1b3b67409..53a4eaf83 100644 --- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/StepSemanticsIntegrationTest.java +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/StepSemanticsIntegrationTest.java @@ -37,6 +37,29 @@ void testAtLeastOnceCompletesSuccessfully() { assertEquals(1, executionCount.get()); } + @Test + void testSemanticsPerRetry_atLeastOnceCompletesSuccessfully() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> ctx.step( + "my-step", + String.class, + stepCtx -> { + executionCount.incrementAndGet(); + return "result"; + }, + StepConfig.builder() + .semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY) + .build())); + + var result = runner.run("test-input"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals(1, executionCount.get()); + } + @Test void testAtMostOnceCompletesSuccessfully() { var executionCount = new AtomicInteger(0); @@ -60,6 +83,29 @@ void testAtMostOnceCompletesSuccessfully() { assertEquals(1, executionCount.get()); } + @Test + void testSemanticsPerRetry_atMostOnceCompletesSuccessfully() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> ctx.step( + "my-step", + String.class, + stepCtx -> { + executionCount.incrementAndGet(); + return "result"; + }, + StepConfig.builder() + .semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .build())); + + var result = runner.run("test-input"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals(1, executionCount.get()); + } + @Test void testAtMostOnceNoRetryFailsImmediately() { var executionCount = new AtomicInteger(0); @@ -84,6 +130,30 @@ void testAtMostOnceNoRetryFailsImmediately() { assertEquals(1, executionCount.get()); } + @Test + void testSemanticsPerRetry_atMostOnceNoRetryFailsImmediately() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> ctx.step( + "my-step", + String.class, + stepCtx -> { + executionCount.incrementAndGet(); + throw new RuntimeException("Always fails"); + }, + StepConfig.builder() + .semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build())); + + var result = runner.run("test-input"); + + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + assertEquals(1, executionCount.get()); + } + @Test void testDefaultSemanticsIsAtLeastOnce() { var executionCount = new AtomicInteger(0); @@ -129,6 +199,34 @@ void testAtLeastOnceReExecutesAfterCheckpointLoss() { assertEquals(2, executionCount.get()); } + @Test + void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointLoss() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + return context.step( + "step", + String.class, + stepCtx -> { + var count = executionCount.incrementAndGet(); + return "Executed " + count + " times"; + }, + StepConfig.builder() + .semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY) + .build()); + }); + + runner.run("test"); + assertEquals(1, executionCount.get()); + + runner.simulateFireAndForgetCheckpointLoss("step"); + + var result = runner.run("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals(2, executionCount.get()); + } + @Test void testAtLeastOnceReExecutesAfterCheckpointFailure() { var executionCount = new AtomicInteger(0); @@ -157,7 +255,37 @@ void testAtLeastOnceReExecutesAfterCheckpointFailure() { } @Test - void testAtMostOnceThrowsExceptionAfterCheckpointFailure() { + void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointFailure() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + return context.step( + "step", + String.class, + stepCtx -> { + var count = executionCount.incrementAndGet(); + return "Executed " + count + " times"; + }, + StepConfig.builder() + .semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY) + .build()); + }); + + runner.run("test"); + assertEquals(1, executionCount.get()); + + runner.resetCheckpointToStarted("step"); + var result = runner.runUntilComplete("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals(2, executionCount.get()); + } + + // This behavior is incorrect (the step should retry after interruption), but is kept for backward + // compatibility. The deprecated StepConfig.semantics() method preserves this behavior. + // Use StepConfig.semanticsPerRetry() for the corrected behavior (see below test). + @Test + void testAtMostOnceThrowsExceptionAfterCheckpointFailure_deprecatedBackwardCompatibility() { var executionCount = new AtomicInteger(0); var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { @@ -183,4 +311,32 @@ void testAtMostOnceThrowsExceptionAfterCheckpointFailure() { assertEquals(ExecutionStatus.FAILED, result.getStatus()); assertEquals(1, executionCount.get()); } + + @Test + void testSemanticsPerRetry_atMostOnceRetriesAfterInterruption() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + return context.step( + "step", + String.class, + stepCtx -> { + executionCount.incrementAndGet(); + return "result"; + }, + StepConfig.builder() + .semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .build()); + }); + + runner.run("test"); + assertEquals(1, executionCount.get()); + + runner.resetCheckpointToStarted("step"); + + var result = runner.runUntilComplete("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals(2, executionCount.get()); + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/config/StepConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/config/StepConfig.java index 8eada6faf..6b4a49481 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/config/StepConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/config/StepConfig.java @@ -15,11 +15,13 @@ public class StepConfig { private final RetryStrategy retryStrategy; private final StepSemantics semantics; + private final StepSemantics semanticsPerRetry; private final SerDes serDes; private StepConfig(Builder builder) { this.retryStrategy = builder.retryStrategy; this.semantics = builder.semantics; + this.semanticsPerRetry = builder.semanticsPerRetry; this.serDes = builder.serDes; } @@ -28,18 +30,29 @@ public RetryStrategy retryStrategy() { return retryStrategy != null ? retryStrategy : RetryStrategies.Presets.DEFAULT; } - /** Returns the delivery semantics for this step, defaults to AT_LEAST_ONCE_PER_RETRY if not specified. */ + /** + * Returns the delivery semantics for this step, defaults to AT_LEAST_ONCE_PER_RETRY if not specified. + * + * @deprecated Use {@link #semanticsPerRetry()} instead. This method has incorrect behavior where + * AT_MOST_ONCE_PER_RETRY does not retry after step interruption. + */ + @Deprecated(forRemoval = true) public StepSemantics semantics() { return semantics != null ? semantics : StepSemantics.AT_LEAST_ONCE_PER_RETRY; } + /** Returns the delivery semantics per retry for this step, or null if not specified. */ + public StepSemantics semanticsPerRetry() { + return semanticsPerRetry; + } + /** Returns the custom serializer for this step, or null if not specified (uses default SerDes). */ public SerDes serDes() { return serDes; } public Builder toBuilder() { - return new Builder(retryStrategy, semantics, serDes); + return new Builder(retryStrategy, semantics, semanticsPerRetry, serDes); } /** @@ -48,18 +61,21 @@ public Builder toBuilder() { * @return a new Builder instance */ public static Builder builder() { - return new Builder(null, null, null); + return new Builder(null, null, null, null); } /** Builder for creating StepConfig instances. */ public static class Builder { private RetryStrategy retryStrategy; private StepSemantics semantics; + private StepSemantics semanticsPerRetry; private SerDes serDes; - public Builder(RetryStrategy retryStrategy, StepSemantics semantics, SerDes serDes) { + public Builder( + RetryStrategy retryStrategy, StepSemantics semantics, StepSemantics semanticsPerRetry, SerDes serDes) { this.retryStrategy = retryStrategy; this.semantics = semantics; + this.semanticsPerRetry = semanticsPerRetry; this.serDes = serDes; } @@ -79,12 +95,28 @@ public Builder retryStrategy(RetryStrategy retryStrategy) { * * @param semantics the delivery semantics to use, defaults to AT_LEAST_ONCE_PER_RETRY if not specified * @return this builder for method chaining + * @deprecated Use {@link #semanticsPerRetry(StepSemantics)} instead. This method has incorrect behavior where + * AT_MOST_ONCE_PER_RETRY does not retry after step interruption. */ + @Deprecated(forRemoval = true) public Builder semantics(StepSemantics semantics) { this.semantics = semantics; return this; } + /** + * Sets the delivery semantics per retry for the step. + * + *

If set, this takes precedence over {@link #semantics(StepSemantics)}. + * + * @param semanticsPerRetry the delivery semantics to use + * @return this builder for method chaining + */ + public Builder semanticsPerRetry(StepSemantics semanticsPerRetry) { + this.semanticsPerRetry = semanticsPerRetry; + return this; + } + /** * Sets a custom serializer for the step. * diff --git a/sdk/src/main/java/software/amazon/lambda/durable/config/StepSemantics.java b/sdk/src/main/java/software/amazon/lambda/durable/config/StepSemantics.java index fc23ab3c7..183991d0f 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/config/StepSemantics.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/config/StepSemantics.java @@ -15,9 +15,14 @@ public enum StepSemantics { AT_LEAST_ONCE_PER_RETRY, /** - * At-most-once delivery per retry attempt. The step will not be re-executed if interrupted. START checkpoint is - * awaited before user code runs. If interrupted, throws - * {@link software.amazon.lambda.durable.exception.StepInterruptedException}. + * At-most-once delivery per retry attempt. START checkpoint is awaited before user code runs. If interrupted, + * throws {@link software.amazon.lambda.durable.exception.StepInterruptedException}. + * + *

When used with {@link StepConfig.Builder#semantics(StepSemantics)}: interrupted steps are NOT retried + * (incorrect behavior, kept for backward compatibility). + * + *

When used with {@link StepConfig.Builder#semanticsPerRetry(StepSemantics)}: interrupted steps are retried + * according to the retry strategy (correct behavior). */ AT_MOST_ONCE_PER_RETRY } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index e8dce59bc..7ac2d4340 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -66,7 +66,7 @@ protected void replay(Operation existing) { switch (existing.status()) { case SUCCEEDED, FAILED -> markAlreadyCompleted(); case STARTED -> { - if (config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY) { + if (isAtMostOnce()) { // AT_MOST_ONCE: treat as interrupted, go through retry logic handleStepFailure(new StepInterruptedException(existing), attempt); } else { @@ -128,7 +128,7 @@ private void checkpointStarted() { if (existing == null || existing.status() != OperationStatus.STARTED) { var startUpdate = OperationUpdate.builder().action(OperationAction.START); - if (config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY) { + if (isAtMostOnce()) { // AT_MOST_ONCE: await START checkpoint before executing user code sendOperationUpdate(startUpdate); } else { @@ -165,7 +165,8 @@ private void handleStepFailure(Throwable exception, int attempt) { errorObject = serializeException(exception); } - var isRetryable = !(exception instanceof StepInterruptedException); + var isRetryable = !(exception instanceof StepInterruptedException) + || config.semanticsPerRetry() == StepSemantics.AT_MOST_ONCE_PER_RETRY; var retryDecision = config.retryStrategy().makeRetryDecision(exception, attempt); if (isRetryable && retryDecision.shouldRetry()) { @@ -217,4 +218,10 @@ public T get() { throw new StepFailedException(op); } } + + @SuppressWarnings("deprecation") + private boolean isAtMostOnce() { + var semantics = config.semanticsPerRetry() != null ? config.semanticsPerRetry() : config.semantics(); + return semantics == StepSemantics.AT_MOST_ONCE_PER_RETRY; + } } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/config/StepConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/config/StepConfigTest.java index 033e0b4d5..a65b6bea9 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/config/StepConfigTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/config/StepConfigTest.java @@ -44,6 +44,22 @@ void testBuilderChaining() { assertEquals(customSerDes, config.serDes()); } + @Test + void testBuilderChainingWithSemanticsPerRetry() { + var strategy = RetryStrategies.Presets.DEFAULT; + var customSerDes = new JacksonSerDes(); + + var config = StepConfig.builder() + .retryStrategy(strategy) + .semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .serDes(customSerDes) + .build(); + + assertEquals(strategy, config.retryStrategy()); + assertEquals(StepSemantics.AT_MOST_ONCE_PER_RETRY, config.semanticsPerRetry()); + assertEquals(customSerDes, config.serDes()); + } + @Test void testBuilderWithNullRetryStrategy() { var config = StepConfig.builder().retryStrategy(null).build(); @@ -58,6 +74,22 @@ void testSemanticsDefaultsToAtLeastOnce() { assertEquals(StepSemantics.AT_LEAST_ONCE_PER_RETRY, config.semantics()); } + @Test + void testSemanticsPerRetryDefaultsToNull() { + var config = StepConfig.builder().build(); + + assertNull(config.semanticsPerRetry()); + } + + @Test + void testBuilderWithSemanticsPerRetry() { + var config = StepConfig.builder() + .semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .build(); + + assertEquals(StepSemantics.AT_MOST_ONCE_PER_RETRY, config.semanticsPerRetry()); + } + @Test void testBuilderWithCustomSerDes() { var customSerDes = new JacksonSerDes();