Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/advanced/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RuntimeException
└── DurableOperationException - General operation exception
├── StepException - General Step exception
│ ├── StepFailedException - Step exhausted all retry attempts. Catch to implement fallback logic or let execution fail.
│ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion. Implement manual recovery (check if operation completed externally)
│ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion and retries exhausted. Implement manual recovery (check if operation completed externally)
├── InvokeException - General chained invocation exception
│ ├── InvokeFailedException - Chained invocation failed. Handle the error or propagate failure.
│ ├── InvokeTimedOutException - Chained invocation timed out. Handle the error or propagate failure.
Expand All @@ -38,7 +38,7 @@ try {
var result = ctx.step("charge-payment", Payment.class,
stepCtx -> paymentService.charge(amount),
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.build());
} catch (StepInterruptedException e) {
// Step started but we don't know if it completed
Expand Down
12 changes: 6 additions & 6 deletions docs/core/steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var result = ctx.step("call-api", Response.class,
stepCtx -> externalApi.call(request),
StepConfig.builder()
.retryStrategy(...)
.semantics(...)
.semanticsPerRetry(...)
.build());
```

Expand Down Expand Up @@ -42,7 +42,7 @@ Configure step behavior with `StepConfig`:
ctx.step("my-step", Result.class, stepCtx -> doWork(),
StepConfig.builder()
.retryStrategy(...) // How to handle failures
.semantics(...) // At-least-once vs at-most-once
.semanticsPerRetry(...) // At-least-once vs at-most-once
.serDes(...) // Custom serialization
.build());
```
Expand Down Expand Up @@ -73,7 +73,7 @@ Control how steps behave when interrupted mid-execution:
| Semantic | Behavior | Use Case |
|----------|----------|----------|
| `AT_LEAST_ONCE_PER_RETRY` (default) | Re-executes step if interrupted before completion | Idempotent operations (database upserts, API calls with idempotency keys) |
| `AT_MOST_ONCE_PER_RETRY` | Never re-executes; throws `StepInterruptedException` if interrupted | Non-idempotent operations (sending emails, charging payments) |
| `AT_MOST_ONCE_PER_RETRY` | Re-executes step once per retry if interrupted. Throws `StepInterruptedException` if retries exhausted | Non-idempotent operations (sending emails, charging payments) |

```java
// Default: at-least-once per retry (step may re-run if interrupted)
Expand All @@ -84,14 +84,14 @@ var result = ctx.step("idempotent-update", Result.class,
var result = ctx.step("send-email", Result.class,
stepCtx -> emailService.send(notification),
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.build());
```

**Important**: These semantics apply *per retry attempt*, not per overall execution:

- **AT_LEAST_ONCE_PER_RETRY**: The step executes at least once per retry. If the step succeeds but checkpointing fails (e.g., sandbox crash), the step re-executes on replay.
- **AT_MOST_ONCE_PER_RETRY**: A checkpoint is created before execution. If failure occurs after checkpoint but before completion, the step is skipped on replay and `StepInterruptedException` is thrown.
- **AT_MOST_ONCE_PER_RETRY**: A checkpoint is created before execution. If failure occurs after checkpoint but before completion, the step is re-executed on a new retry attempt. `StepInterruptedException` is thrown if retries are exhausted.

To achieve step-level at-most-once semantics, combine with a no-retry strategy:

Expand All @@ -100,7 +100,7 @@ To achieve step-level at-most-once semantics, combine with a no-retry strategy:
var result = ctx.step("charge-payment", Result.class,
stepCtx -> paymentService.charge(amount),
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
```
Expand Down
2 changes: 1 addition & 1 deletion docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ context.step("name", Type.class, stepCtx -> doWork(),
StepConfig.builder()
.serDes(stepSpecificSerDes)
.retryStrategy(RetryStrategies.exponentialBackoff(3, Duration.ofSeconds(1)))
.semantics(AT_MOST_ONCE_PER_RETRY)
.semanticsPerRetry(AT_MOST_ONCE_PER_RETRY)
.build());
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public String handleRequest(Object input, DurableContext context) {

// Example 2: Handling StepInterruptedException for AT_MOST_ONCE operations
// StepInterruptedException is thrown when an AT_MOST_ONCE step was started
// but the function was interrupted before the step completed.
// but the function was interrupted before the step completed on every attempt.
// In normal execution, this step succeeds. The catch block handles the
// interruption scenario that occurs during replay after an unexpected termination.
String paymentResult;
Expand All @@ -83,7 +83,8 @@ public String handleRequest(Object input, DurableContext context) {
String.class,
stepCtx -> "payment-" + input,
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
} catch (StepInterruptedException e) {
logger.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,29 @@ void testAtLeastOnceCompletesSuccessfully() {
assertEquals(1, executionCount.get());
}

@Test
void testSemanticsPerRetry_atLeastOnceCompletesSuccessfully() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
String.class,
(input, ctx) -> ctx.step(
"my-step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
return "result";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build()));

var result = runner.run("test-input");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(1, executionCount.get());
}

@Test
void testAtMostOnceCompletesSuccessfully() {
var executionCount = new AtomicInteger(0);
Expand All @@ -60,6 +83,29 @@ void testAtMostOnceCompletesSuccessfully() {
assertEquals(1, executionCount.get());
}

@Test
void testSemanticsPerRetry_atMostOnceCompletesSuccessfully() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
String.class,
(input, ctx) -> ctx.step(
"my-step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
return "result";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.build()));

var result = runner.run("test-input");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(1, executionCount.get());
}

@Test
void testAtMostOnceNoRetryFailsImmediately() {
var executionCount = new AtomicInteger(0);
Expand All @@ -84,6 +130,30 @@ void testAtMostOnceNoRetryFailsImmediately() {
assertEquals(1, executionCount.get());
}

@Test
void testSemanticsPerRetry_atMostOnceNoRetryFailsImmediately() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
String.class,
(input, ctx) -> ctx.step(
"my-step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
throw new RuntimeException("Always fails");
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build()));

var result = runner.run("test-input");

assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertEquals(1, executionCount.get());
}

@Test
void testDefaultSemanticsIsAtLeastOnce() {
var executionCount = new AtomicInteger(0);
Expand Down Expand Up @@ -129,6 +199,34 @@ void testAtLeastOnceReExecutesAfterCheckpointLoss() {
assertEquals(2, executionCount.get());
}

@Test
void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointLoss() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
var count = executionCount.incrementAndGet();
return "Executed " + count + " times";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build());
});

runner.run("test");
assertEquals(1, executionCount.get());

runner.simulateFireAndForgetCheckpointLoss("step");

var result = runner.run("test");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(2, executionCount.get());
}

@Test
void testAtLeastOnceReExecutesAfterCheckpointFailure() {
var executionCount = new AtomicInteger(0);
Expand Down Expand Up @@ -157,7 +255,37 @@ void testAtLeastOnceReExecutesAfterCheckpointFailure() {
}

@Test
void testAtMostOnceThrowsExceptionAfterCheckpointFailure() {
void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointFailure() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
var count = executionCount.incrementAndGet();
return "Executed " + count + " times";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build());
});

runner.run("test");
assertEquals(1, executionCount.get());

runner.resetCheckpointToStarted("step");
var result = runner.runUntilComplete("test");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(2, executionCount.get());
}

// This behavior is incorrect (the step should retry after interruption), but is kept for backward
// compatibility. The deprecated StepConfig.semantics() method preserves this behavior.
// Use StepConfig.semanticsPerRetry() for the corrected behavior (see below test).
@Test
void testAtMostOnceThrowsExceptionAfterCheckpointFailure_deprecatedBackwardCompatibility() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
Expand All @@ -183,4 +311,32 @@ void testAtMostOnceThrowsExceptionAfterCheckpointFailure() {
assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertEquals(1, executionCount.get());
}

@Test
void testSemanticsPerRetry_atMostOnceRetriesAfterInterruption() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
return "result";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.build());
});

runner.run("test");
assertEquals(1, executionCount.get());

runner.resetCheckpointToStarted("step");

var result = runner.runUntilComplete("test");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(2, executionCount.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
public class StepConfig {
private final RetryStrategy retryStrategy;
private final StepSemantics semantics;
private final StepSemantics semanticsPerRetry;
private final SerDes serDes;

private StepConfig(Builder builder) {
this.retryStrategy = builder.retryStrategy;
this.semantics = builder.semantics;
this.semanticsPerRetry = builder.semanticsPerRetry;
this.serDes = builder.serDes;
}

Expand All @@ -28,18 +30,29 @@ public RetryStrategy retryStrategy() {
return retryStrategy != null ? retryStrategy : RetryStrategies.Presets.DEFAULT;
}

/** Returns the delivery semantics for this step, defaults to AT_LEAST_ONCE_PER_RETRY if not specified. */
/**
* Returns the delivery semantics for this step, defaults to AT_LEAST_ONCE_PER_RETRY if not specified.
*
* @deprecated Use {@link #semanticsPerRetry()} instead. This method has incorrect behavior where
* AT_MOST_ONCE_PER_RETRY does not retry after step interruption.
*/
@Deprecated(forRemoval = true)
public StepSemantics semantics() {
return semantics != null ? semantics : StepSemantics.AT_LEAST_ONCE_PER_RETRY;
}

/** Returns the delivery semantics per retry for this step, or null if not specified. */
public StepSemantics semanticsPerRetry() {
return semanticsPerRetry;
}

/** Returns the custom serializer for this step, or null if not specified (uses default SerDes). */
public SerDes serDes() {
return serDes;
}

public Builder toBuilder() {
return new Builder(retryStrategy, semantics, serDes);
return new Builder(retryStrategy, semantics, semanticsPerRetry, serDes);
}

/**
Expand All @@ -48,18 +61,21 @@ public Builder toBuilder() {
* @return a new Builder instance
*/
public static Builder builder() {
return new Builder(null, null, null);
return new Builder(null, null, null, null);
}

/** Builder for creating StepConfig instances. */
public static class Builder {
private RetryStrategy retryStrategy;
private StepSemantics semantics;
private StepSemantics semanticsPerRetry;
private SerDes serDes;

public Builder(RetryStrategy retryStrategy, StepSemantics semantics, SerDes serDes) {
public Builder(
RetryStrategy retryStrategy, StepSemantics semantics, StepSemantics semanticsPerRetry, SerDes serDes) {
this.retryStrategy = retryStrategy;
this.semantics = semantics;
this.semanticsPerRetry = semanticsPerRetry;
this.serDes = serDes;
}

Expand All @@ -79,12 +95,28 @@ public Builder retryStrategy(RetryStrategy retryStrategy) {
*
* @param semantics the delivery semantics to use, defaults to AT_LEAST_ONCE_PER_RETRY if not specified
* @return this builder for method chaining
* @deprecated Use {@link #semanticsPerRetry(StepSemantics)} instead. This method has incorrect behavior where
* AT_MOST_ONCE_PER_RETRY does not retry after step interruption.
*/
@Deprecated(forRemoval = true)
public Builder semantics(StepSemantics semantics) {
this.semantics = semantics;
return this;
}

/**
* Sets the delivery semantics per retry for the step.
*
* <p>If set, this takes precedence over {@link #semantics(StepSemantics)}.
*
* @param semanticsPerRetry the delivery semantics to use
* @return this builder for method chaining
*/
public Builder semanticsPerRetry(StepSemantics semanticsPerRetry) {
this.semanticsPerRetry = semanticsPerRetry;
return this;
}

/**
* Sets a custom serializer for the step.
*
Expand Down
Loading
Loading