From c0f79fa8464ebbe3fa0b7d8fea00337ebacc00d2 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 29 Dec 2025 11:52:46 -0800 Subject: [PATCH 1/3] Add Async.await() for non-blocking condition waiting Introduces Async.await() methods that return a Promise instead of blocking, allowing workflows to wait for conditions asynchronously. This enables concurrent condition waiting and Promise composition with anyOf/allOf. Key changes: - Add Async.await(Supplier) returning Promise - Add Async.await(Duration, Supplier) returning Promise - Implement condition watchers in SyncWorkflowContext evaluated via beforeThreadsWakeUp callback in DeterministicRunner - Support cancellation through CancellationScope - Add comprehensive tests including chaining, anyOf/allOf, and cancellation --- .../WorkflowOutboundCallsInterceptor.java | 20 ++ .../WorkflowOutboundCallsInterceptorBase.java | 10 + .../internal/sync/DeterministicRunner.java | 27 +- .../sync/DeterministicRunnerImpl.java | 26 +- .../temporal/internal/sync/SyncWorkflow.java | 3 +- .../internal/sync/SyncWorkflowContext.java | 220 +++++++++++++ .../internal/sync/WorkflowInternal.java | 16 + .../main/java/io/temporal/workflow/Async.java | 33 ++ .../sync/DeterministicRunnerTest.java | 76 +++++ .../io/temporal/workflow/AsyncAwaitTest.java | 291 ++++++++++++++++++ .../TestActivityEnvironmentInternal.java | 10 + .../internal/TracingWorkerInterceptor.java | 16 + 12 files changed, 743 insertions(+), 5 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index d515817b29..b49fca3dc6 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -742,6 +742,26 @@ public DynamicUpdateHandler getHandler() { void await(String reason, Supplier unblockCondition); + /** + * Asynchronously wait until unblockCondition evaluates to true. + * + * @param unblockCondition condition that should return true to indicate completion + * @return Promise that completes when the condition becomes true, or completes exceptionally with + * CanceledFailure if the enclosing CancellationScope is canceled + */ + Promise awaitAsync(Supplier unblockCondition); + + /** + * Asynchronously wait until unblockCondition evaluates to true or timeout expires. + * + * @param timeout maximum time to wait for the condition + * @param unblockCondition condition that should return true to indicate completion + * @return Promise that completes with true if the condition was satisfied, false if the timeout + * expired, or exceptionally with CanceledFailure if the enclosing CancellationScope is + * canceled + */ + Promise awaitAsync(Duration timeout, Supplier unblockCondition); + Promise newTimer(Duration duration); Promise newTimer(Duration duration, TimerOptions options); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java index 9d99d4c78b..a79a05ce84 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java @@ -75,6 +75,16 @@ public void await(String reason, Supplier unblockCondition) { next.await(reason, unblockCondition); } + @Override + public Promise awaitAsync(Supplier unblockCondition) { + return next.awaitAsync(unblockCondition); + } + + @Override + public Promise awaitAsync(Duration timeout, Supplier unblockCondition) { + return next.awaitAsync(timeout, unblockCondition); + } + @Override public Promise newTimer(Duration duration) { return next.newTimer(duration); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java index efafe230af..904d9f90ee 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java @@ -3,6 +3,7 @@ import io.temporal.internal.worker.WorkflowExecutorCache; import io.temporal.workflow.CancellationScope; import java.util.Optional; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -29,7 +30,7 @@ static DeterministicRunner newRunner( SyncWorkflowContext workflowContext, Runnable root, WorkflowExecutorCache cache) { - return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache); + return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache, null); } /** @@ -44,7 +45,29 @@ static DeterministicRunner newRunner( WorkflowThreadExecutor workflowThreadExecutor, SyncWorkflowContext workflowContext, Runnable root) { - return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null); + return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null, null); + } + + /** + * Create new instance of DeterministicRunner with a callback invoked before threads wake up. + * + * @param workflowThreadExecutor executor for workflow thread Runnables + * @param workflowContext workflow context to use + * @param root function that root thread of the runner executes. + * @param cache WorkflowExecutorCache used cache inflight workflows + * @param beforeThreadsWakeUp callback invoked once per loop iteration before threads run. Returns + * true if progress was made (e.g., a condition watcher fired), which causes the loop to + * continue even if all threads are blocked. Returns false if no progress was made. + * @return instance of the DeterministicRunner. + */ + static DeterministicRunner newRunner( + WorkflowThreadExecutor workflowThreadExecutor, + SyncWorkflowContext workflowContext, + Runnable root, + WorkflowExecutorCache cache, + @Nullable Supplier beforeThreadsWakeUp) { + return new DeterministicRunnerImpl( + workflowThreadExecutor, workflowContext, root, cache, beforeThreadsWakeUp); } /** diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java index 234f71bff5..0c5c4618dc 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java @@ -69,6 +69,9 @@ class DeterministicRunnerImpl implements DeterministicRunner { // always accessed under the runner lock private final List toExecuteInWorkflowThread = new ArrayList<>(); + // Callback invoked before threads wake up in each event loop iteration + @Nullable private final Supplier beforeThreadsWakeUp; + // Access to workflowThreadsToAdd, callbackThreadsToAdd, addedThreads doesn't have to be // synchronized. // Inside DeterministicRunner the access to these variables is under the runner lock. @@ -144,7 +147,7 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) { WorkflowThreadExecutor workflowThreadExecutor, @Nonnull SyncWorkflowContext workflowContext, Runnable root) { - this(workflowThreadExecutor, workflowContext, root, null); + this(workflowThreadExecutor, workflowContext, root, null, null); } DeterministicRunnerImpl( @@ -152,12 +155,22 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) { @Nonnull SyncWorkflowContext workflowContext, Runnable root, WorkflowExecutorCache cache) { + this(workflowThreadExecutor, workflowContext, root, cache, null); + } + + DeterministicRunnerImpl( + WorkflowThreadExecutor workflowThreadExecutor, + @Nonnull SyncWorkflowContext workflowContext, + Runnable root, + WorkflowExecutorCache cache, + @Nullable Supplier beforeThreadsWakeUp) { this.workflowThreadExecutor = workflowThreadExecutor; this.workflowContext = Preconditions.checkNotNull(workflowContext, "workflowContext"); // TODO this should be refactored, publishing of this in an constructor into external objects is // a bad practice this.workflowContext.setRunner(this); this.cache = cache; + this.beforeThreadsWakeUp = beforeThreadsWakeUp; boolean deterministicCancellationScopeOrder = workflowContext .getReplayContext() @@ -208,7 +221,16 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) { appendCallbackThreadsLocked(); } toExecuteInWorkflowThread.clear(); - progress = false; + + // Invoke beforeThreadsWakeUp callback BEFORE running threads. + // This allows the callback to evaluate conditions and complete promises, + // ensuring threads see updated state when they wake up. + if (beforeThreadsWakeUp != null) { + progress = beforeThreadsWakeUp.get(); + } else { + progress = false; + } + Iterator ci = threads.iterator(); while (ci.hasNext()) { WorkflowThread c = ci.next(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index a9f1f1107d..77b34e6362 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -124,7 +124,8 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) { context.getWorkflowExecution())) .start(); }, - cache); + cache, + workflowContext.getBeforeThreadsWakeUpCallback()); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 977d9754e6..aae8c62b19 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -54,6 +54,7 @@ import java.time.Duration; import java.time.Instant; import java.util.*; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; @@ -104,6 +105,9 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall private final WorkflowThreadLocal currentUpdateInfo = new WorkflowThreadLocal<>(); @Nullable private String currentDetails; + // Condition watchers for async await functionality + private final List conditionWatchers = new ArrayList<>(); + public SyncWorkflowContext( @Nonnull String namespace, @Nonnull WorkflowExecution workflowExecution, @@ -1327,6 +1331,135 @@ public void await(String reason, Supplier unblockCondition) { WorkflowThread.await(reason, unblockCondition); } + @Override + public Promise awaitAsync(Supplier unblockCondition) { + // Check if condition is already true + setReadOnly(true); + try { + if (unblockCondition.get()) { + return Workflow.newPromise(null); + } + } finally { + setReadOnly(false); + } + + CompletablePromise result = Workflow.newPromise(); + + // Capture cancellation state - the condition will be evaluated from the runner thread + // where CancellationScope.current() is not available + AtomicBoolean cancelled = new AtomicBoolean(false); + + Functions.Proc cancelHandle = + registerConditionWatcher( + () -> { + if (cancelled.get()) { + throw new CanceledFailure("cancelled"); + } + return unblockCondition.get(); + }, + (e) -> { + // Complete promise directly - this runs after condition evaluation + // but before threads run, so blocked threads will see completed state + if (e == null) { + result.complete(null); + } else { + result.completeExceptionally(e); + } + }); + + // Handle cancellation + CancellationScope.current() + .getCancellationRequest() + .thenApply( + (r) -> { + cancelled.set(true); + result.completeExceptionally(new CanceledFailure(r)); + cancelHandle.apply(); // Remove the watcher + return r; + }); + + return result; + } + + @Override + public Promise awaitAsync(Duration timeout, Supplier unblockCondition) { + // Check if condition is already true + setReadOnly(true); + try { + if (unblockCondition.get()) { + return Workflow.newPromise(true); + } + } finally { + setReadOnly(false); + } + + CompletablePromise result = Workflow.newPromise(); + + // Capture cancellation state - the condition will be evaluated from the runner thread + // where CancellationScope.current() is not available + AtomicBoolean cancelled = new AtomicBoolean(false); + + // Create timer - need access to cancellation handle + AtomicReference> timerCancellation = new AtomicReference<>(); + AtomicBoolean timerCompleted = new AtomicBoolean(false); + + timerCancellation.set( + replayContext.newTimer( + timeout, + null, // metadata + (e) -> { + // Set timer flag directly so condition watcher sees it immediately + if (e == null) { + timerCompleted.set(true); + } + // Timer cancellation exceptions are ignored - we just care if it fired + })); + + // Register with current CancellationScope for timer cancellation + CancellationScope.current() + .getCancellationRequest() + .thenApply( + (r) -> { + timerCancellation.get().apply(new CanceledFailure(r)); + return r; + }); + + Functions.Proc cancelHandle = + registerConditionWatcher( + () -> { + if (cancelled.get()) { + throw new CanceledFailure("cancelled"); + } + return unblockCondition.get() || timerCompleted.get(); + }, + (e) -> { + // Complete promise directly so blocked threads see it immediately + if (e != null) { + result.completeExceptionally(e); + } else { + boolean conditionMet = unblockCondition.get(); + result.complete(conditionMet); + if (conditionMet && !timerCompleted.get()) { + // Cancel timer since condition was met first + timerCancellation.get().apply(new CanceledFailure("condition met")); + } + } + }); + + // Handle cancellation - complete result promise + CancellationScope.current() + .getCancellationRequest() + .thenApply( + (r) -> { + cancelled.set(true); + result.completeExceptionally(new CanceledFailure(r)); + cancelHandle.apply(); // Remove the watcher + return r; + }); + + return result; + } + @SuppressWarnings("deprecation") @Override public void continueAsNew(ContinueAsNewInput input) { @@ -1584,4 +1717,91 @@ public Failure getFailure() { return failure; } } + + /** + * Returns a callback to be used by DeterministicRunner before threads wake up. This callback + * evaluates condition watchers and completes promises as needed. + */ + public Supplier getBeforeThreadsWakeUpCallback() { + return this::evaluateConditionWatchers; + } + + /** + * Registers a condition watcher for async await functionality. The condition is evaluated at the + * end of each event loop iteration. + * + * @param condition Supplier that returns true when the wait should complete. Evaluated in + * read-only mode. + * @param callback Called when condition becomes true (with null) or on error (with exception). + * @return Handle to cancel the wait. Invoke to unregister the condition. + */ + Functions.Proc registerConditionWatcher( + Supplier condition, Functions.Proc1 callback) { + ConditionWatcher watcher = new ConditionWatcher(condition, callback); + conditionWatchers.add(watcher); + return watcher.getCancelHandler(); + } + + /** + * Evaluates all condition watchers and invokes callbacks for satisfied conditions. Watchers that + * are satisfied or have thrown exceptions are removed from the list. + * + * @return true if any condition was satisfied (indicating progress was made) + */ + private boolean evaluateConditionWatchers() { + boolean anyMatched = false; + Iterator it = conditionWatchers.iterator(); + while (it.hasNext()) { + ConditionWatcher watcher = it.next(); + if (watcher.canceled) { + it.remove(); + continue; + } + + boolean matched; + try { + // We must set read-only mode here because the condition is evaluated from the runner + // thread, not a workflow thread. The wrapper in WorkflowInternal.awaitAsync uses + // getRootWorkflowContext() which requires being called from a workflow thread. + setReadOnly(true); + try { + matched = watcher.condition.get(); + } finally { + setReadOnly(false); + } + } catch (RuntimeException e) { + // Condition threw - invoke callback with exception and remove watcher + it.remove(); + watcher.callback.apply(e); + anyMatched = true; + continue; + } + + if (matched) { + it.remove(); + watcher.callback.apply(null); // null = success + anyMatched = true; + } + } + return anyMatched; + } + + /** + * Holds a condition and its associated callback for async await functionality. The condition is + * evaluated at the end of each event loop iteration. + */ + private static class ConditionWatcher { + final Supplier condition; + final Functions.Proc1 callback; + boolean canceled; + + ConditionWatcher(Supplier condition, Functions.Proc1 callback) { + this.condition = condition; + this.callback = callback; + } + + Functions.Proc getCancelHandler() { + return () -> canceled = true; + } + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 099b2f9b48..ade0c610f7 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -520,6 +520,22 @@ public static boolean await(Duration timeout, String reason, Supplier u }); } + public static Promise awaitAsync(Supplier unblockCondition) { + assertNotReadOnly("awaitAsync"); + // Don't wrap the condition with setReadOnly here - the condition will be evaluated + // from the runner thread (not a workflow thread), so getRootWorkflowContext() won't work. + // SyncWorkflowContext.evaluateConditionWatchers handles setReadOnly directly. + return getWorkflowOutboundInterceptor().awaitAsync(unblockCondition); + } + + public static Promise awaitAsync(Duration timeout, Supplier unblockCondition) { + assertNotReadOnly("awaitAsync"); + // Don't wrap the condition with setReadOnly here - the condition will be evaluated + // from the runner thread (not a workflow thread), so getRootWorkflowContext() won't work. + // SyncWorkflowContext.evaluateConditionWatchers handles setReadOnly directly. + return getWorkflowOutboundInterceptor().awaitAsync(timeout, unblockCondition); + } + public static R sideEffect(Class resultClass, Type resultType, Func func) { assertNotReadOnly("side effect"); return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func); diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Async.java b/temporal-sdk/src/main/java/io/temporal/workflow/Async.java index b8f8e251c6..c742aa40af 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Async.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Async.java @@ -2,6 +2,7 @@ import io.temporal.common.RetryOptions; import io.temporal.internal.sync.AsyncInternal; +import io.temporal.internal.sync.WorkflowInternal; import java.time.Duration; import java.util.Optional; @@ -231,6 +232,38 @@ public static Promise retry( return AsyncInternal.retry(options, expiration, fn); } + /** + * Asynchronously wait until unblockCondition evaluates to true. + * + * @param unblockCondition condition that should return true to indicate completion. The condition + * is called on every state transition, so it should never call any blocking operations or + * contain code that mutates workflow state. + * @return Promise that completes when the condition becomes true, or completes exceptionally with + * CanceledFailure if the enclosing CancellationScope is canceled. + */ + public static Promise await(java.util.function.Supplier unblockCondition) { + return WorkflowInternal.awaitAsync(unblockCondition); + } + + /** + * Asynchronously wait until unblockCondition evaluates to true or timeout expires. + * + * @param timeout maximum time to wait for the condition + * @param unblockCondition condition that should return true to indicate completion. The condition + * is called on every state transition, so it should never call any blocking operations or + * contain code that mutates workflow state. + * @return Promise that completes with: + *
    + *
  • true if the condition was satisfied + *
  • false if the timeout expired before the condition was satisfied + *
  • exceptionally with CanceledFailure if the enclosing CancellationScope is canceled + *
+ */ + public static Promise await( + Duration timeout, java.util.function.Supplier unblockCondition) { + return WorkflowInternal.awaitAsync(timeout, unblockCondition); + } + /** Prohibits instantiation. */ private Async() {} } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java index ba3a0eb332..d504efab21 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java @@ -963,4 +963,80 @@ public void testSupplierCalledMultipleWithoutCaching() { }); d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); } + + /** + * Test that beforeThreadsWakeUp callback is invoked BEFORE threads run. The callback sets a value + * that the thread reads, proving the callback ran first. + */ + @Test + public void testBeforeThreadsWakeUpCallbackInvokedBeforeThreads() { + AtomicBoolean valueSetByCallback = new AtomicBoolean(false); + AtomicBoolean threadSawValue = new AtomicBoolean(false); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + // Thread checks if callback set the value + threadSawValue.set(valueSetByCallback.get()); + status = "done"; + }, + null, + () -> { + // Callback sets value before threads run + valueSetByCallback.set(true); + return false; + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertTrue(d.isDone()); + assertTrue("Callback should set value before thread runs", valueSetByCallback.get()); + assertTrue("Thread should see value set by callback", threadSawValue.get()); + } + + /** + * Test that when beforeThreadsWakeUp returns true (progress made), the loop continues and threads + * run again. The callback can return true multiple times when notifying multiple conditions. + */ + @Test + public void testBeforeThreadsWakeUpProgressContinuesLoop() { + AtomicBoolean shouldUnblock1 = new AtomicBoolean(false); + AtomicBoolean shouldUnblock2 = new AtomicBoolean(false); + AtomicInteger trueCount = new AtomicInteger(0); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "waiting1"; + WorkflowThread.await("wait1", shouldUnblock1::get); + status = "waiting2"; + WorkflowThread.await("wait2", shouldUnblock2::get); + status = "done"; + }, + null, + () -> { + // Callback can return true multiple times - once for each condition it unblocks + if (status.equals("waiting1") && !shouldUnblock1.get()) { + shouldUnblock1.set(true); + trueCount.incrementAndGet(); + return true; + } + if (status.equals("waiting2") && !shouldUnblock2.get()) { + shouldUnblock2.set(true); + trueCount.incrementAndGet(); + return true; + } + return false; + }); + + // Single runUntilAllBlocked: callback returns true twice (once per condition), + // thread advances through both waits to completion + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done", status); + assertTrue(d.isDone()); + assertEquals("Callback should return true twice (once per condition)", 2, trueCount.get()); + } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java new file mode 100644 index 0000000000..65e98d2419 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java @@ -0,0 +1,291 @@ +package io.temporal.workflow; + +import static org.junit.Assert.*; + +import io.temporal.failure.CanceledFailure; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@link Async#await} - the asynchronous, non-blocking version of {@link Workflow#await}. + */ +public class AsyncAwaitTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestAsyncAwaitWorkflow.class).build(); + + @Test + public void testBasicAsyncAwait() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("basic"); + assertEquals("condition1-met condition2-met done", result); + } + + @Test + public void testConditionTrueImmediately() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("immediate"); + assertEquals("immediate-true", result); + } + + @Test + public void testMultipleAsyncAwaits() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("multi"); + assertTrue(result.contains("first")); + assertTrue(result.contains("second")); + assertTrue(result.contains("third")); + } + + @Test + public void testTimedAwaitConditionMetFirst() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("timed-condition-first"); + assertEquals("condition-met:true", result); + } + + @Test + public void testTimedAwaitTimeoutFirst() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("timed-timeout-first"); + assertEquals("timeout:false", result); + } + + @Test + public void testTimedAwaitConditionAlreadyTrue() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("already-true"); + assertEquals("already-true:true", result); + } + + @Test + public void testPromiseAnyOfAsyncAwait() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("anyof"); + assertTrue(result.equals("first-won") || result.equals("second-won") || result.equals("both")); + } + + @Test + public void testPromiseAllOfAsyncAwait() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("allof"); + assertEquals("all-completed", result); + } + + @Test + public void testAsyncAwaitChaining() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("chaining"); + assertEquals("chained-result:42", result); + } + + @Test + public void testAsyncAwaitCancellation() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("cancellation"); + assertEquals("cancelled", result); + } + + /** Combined workflow that handles all test scenarios. */ + public static class TestAsyncAwaitWorkflow implements TestWorkflow1 { + private boolean condition1 = false; + private boolean condition2 = false; + private int counter = 0; + private int value = 0; + + @Override + public String execute(String testCase) { + switch (testCase) { + case "basic": + return testBasic(); + case "immediate": + return testImmediate(); + case "multi": + return testMultiple(); + case "timed-condition-first": + return testTimedConditionFirst(); + case "timed-timeout-first": + return testTimedTimeoutFirst(); + case "already-true": + return testAlreadyTrue(); + case "anyof": + return testAnyOf(); + case "allof": + return testAllOf(); + case "chaining": + return testChaining(); + case "cancellation": + return testCancellation(); + default: + return "unknown test case"; + } + } + + private String testBasic() { + StringBuilder result = new StringBuilder(); + + Promise await1 = Async.await(() -> condition1); + Promise await2 = Async.await(() -> condition2); + + condition1 = true; + await1.get(); + result.append("condition1-met "); + + condition2 = true; + await2.get(); + result.append("condition2-met "); + + result.append("done"); + return result.toString(); + } + + private String testImmediate() { + Promise promise = Async.await(() -> true); + promise.get(); + return "immediate-true"; + } + + private String testMultiple() { + List results = new ArrayList<>(); + + Promise first = Async.await(() -> counter >= 1); + Promise second = Async.await(() -> counter >= 2); + Promise third = Async.await(() -> counter >= 3); + + first.thenApply( + v -> { + results.add("first"); + return null; + }); + second.thenApply( + v -> { + results.add("second"); + return null; + }); + third.thenApply( + v -> { + results.add("third"); + return null; + }); + + counter = 1; + Workflow.sleep(Duration.ofMillis(1)); + counter = 2; + Workflow.sleep(Duration.ofMillis(1)); + counter = 3; + + Promise.allOf(first, second, third).get(); + + return String.join(" ", results); + } + + private String testTimedConditionFirst() { + condition1 = false; + Promise promise = Async.await(Duration.ofSeconds(10), () -> condition1); + + Workflow.sleep(Duration.ofMillis(100)); + condition1 = true; + + boolean result = promise.get(); + return "condition-met:" + result; + } + + private String testTimedTimeoutFirst() { + Promise promise = Async.await(Duration.ofMillis(100), () -> false); + boolean result = promise.get(); + return "timeout:" + result; + } + + private String testAlreadyTrue() { + Promise promise = Async.await(Duration.ofSeconds(10), () -> true); + boolean result = promise.get(); + return "already-true:" + result; + } + + private String testAnyOf() { + condition1 = false; + condition2 = false; + + Promise first = Async.await(() -> condition1); + Promise second = Async.await(() -> condition2); + + condition1 = true; + + Promise.anyOf(first, second).get(); + + if (first.isCompleted() && !second.isCompleted()) { + return "first-won"; + } else if (second.isCompleted() && !first.isCompleted()) { + return "second-won"; + } else { + return "both"; + } + } + + private String testAllOf() { + condition1 = false; + condition2 = false; + + Promise await1 = Async.await(() -> condition1); + Promise await2 = Async.await(() -> condition2); + + condition1 = true; + Workflow.sleep(Duration.ofMillis(1)); + condition2 = true; + + Promise.allOf(await1, await2).get(); + return "all-completed"; + } + + private String testChaining() { + value = 0; + Promise chainedPromise = + Async.await(() -> value > 0) + .thenApply(v -> value * 2) + .handle( + (result, failure) -> { + if (failure != null) { + return -1; + } + return result; + }); + + value = 21; + + int result = chainedPromise.get(); + return "chained-result:" + result; + } + + private String testCancellation() { + condition1 = false; + final Promise[] promiseHolder = new Promise[1]; + + CancellationScope scope = + Workflow.newCancellationScope( + () -> { + // Create an async await that will never complete on its own + promiseHolder[0] = Async.await(() -> condition1); + }); + + // Run the scope (this is non-blocking since Async.await returns immediately) + scope.run(); + + // Cancel the scope + scope.cancel(); + + // The promise should fail with CanceledFailure when we try to get it + try { + promiseHolder[0].get(); + return "not-cancelled"; + } catch (CanceledFailure e) { + return "cancelled"; + } + } + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 1221dec557..be3351dcb7 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -394,6 +394,16 @@ public void await(String reason, Supplier unblockCondition) { throw new UnsupportedOperationException("not implemented"); } + @Override + public Promise awaitAsync(Supplier unblockCondition) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Promise awaitAsync(Duration timeout, Supplier unblockCondition) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public Promise newTimer(Duration duration) { throw new UnsupportedOperationException("not implemented"); diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 77d82bdaee..ebf997f1f2 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -233,6 +233,22 @@ public void await(String reason, Supplier unblockCondition) { next.await(reason, unblockCondition); } + @Override + public Promise awaitAsync(Supplier unblockCondition) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("awaitAsync"); + } + return next.awaitAsync(unblockCondition); + } + + @Override + public Promise awaitAsync(Duration timeout, Supplier unblockCondition) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("awaitAsync " + timeout); + } + return next.awaitAsync(timeout, unblockCondition); + } + @Override public Promise newTimer(Duration duration) { if (!WorkflowUnsafe.isReplaying()) { From 4c6f565c9895b2a9a0c9be7384293bd11cc6901f Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 29 Dec 2025 13:36:27 -0800 Subject: [PATCH 2/3] Add tests for Async.await() timed cancellation and condition exceptions - Add testTimedAsyncAwaitCancellation: verifies cancellation of the timed Async.await(Duration, Supplier) variant via CancellationScope - Add testAsyncAwaitConditionThrows: verifies that exceptions thrown by conditions during subsequent evaluations complete the promise exceptionally --- .../io/temporal/workflow/AsyncAwaitTest.java | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java index 65e98d2419..254fa8bacb 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java @@ -92,6 +92,20 @@ public void testAsyncAwaitCancellation() { assertEquals("cancelled", result); } + @Test + public void testTimedAsyncAwaitCancellation() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("timed-cancellation"); + assertEquals("timed-cancelled", result); + } + + @Test + public void testAsyncAwaitConditionThrows() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("condition-throws"); + assertEquals("caught:simulated error", result); + } + /** Combined workflow that handles all test scenarios. */ public static class TestAsyncAwaitWorkflow implements TestWorkflow1 { private boolean condition1 = false; @@ -122,6 +136,10 @@ public String execute(String testCase) { return testChaining(); case "cancellation": return testCancellation(); + case "timed-cancellation": + return testTimedCancellation(); + case "condition-throws": + return testConditionThrows(); default: return "unknown test case"; } @@ -287,5 +305,59 @@ private String testCancellation() { return "cancelled"; } } + + private String testTimedCancellation() { + condition1 = false; + final Promise[] promiseHolder = new Promise[1]; + + CancellationScope scope = + Workflow.newCancellationScope( + () -> { + // Create a timed async await that will never complete on its own + promiseHolder[0] = Async.await(Duration.ofHours(1), () -> condition1); + }); + + // Run the scope (this is non-blocking since Async.await returns immediately) + scope.run(); + + // Cancel the scope + scope.cancel(); + + // The promise should fail with CanceledFailure when we try to get it + try { + promiseHolder[0].get(); + return "timed-not-cancelled"; + } catch (CanceledFailure e) { + return "timed-cancelled"; + } + } + + private String testConditionThrows() { + // Start with condition that doesn't throw, but will throw on subsequent evaluation + // Initial check returns false (doesn't throw), then later evaluation throws + counter = 0; + + Promise promise = + Async.await( + () -> { + counter++; + // First evaluation (initial check) returns false + // Second evaluation (in evaluateConditionWatchers) throws + if (counter > 1) { + throw new RuntimeException("simulated error"); + } + return false; + }); + + // Trigger re-evaluation by sleeping (causes event loop iteration) + Workflow.sleep(Duration.ofMillis(1)); + + try { + promise.get(); + return "no-exception"; + } catch (RuntimeException e) { + return "caught:" + e.getMessage(); + } + } } } From cf0eb1ece8c8ef06448aad7cecea68de7dc62235 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 29 Dec 2025 13:56:36 -0800 Subject: [PATCH 3/3] Add cross-reference @see tags between related wait/sleep methods Link Workflow.sleep, Workflow.newTimer, Workflow.await, and Async.await in Javadoc to help developers discover blocking vs non-blocking alternatives. --- .../main/java/io/temporal/workflow/Async.java | 3 +++ .../java/io/temporal/workflow/Workflow.java | 17 +++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Async.java b/temporal-sdk/src/main/java/io/temporal/workflow/Async.java index c742aa40af..007f65b418 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Async.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Async.java @@ -240,6 +240,7 @@ public static Promise retry( * contain code that mutates workflow state. * @return Promise that completes when the condition becomes true, or completes exceptionally with * CanceledFailure if the enclosing CancellationScope is canceled. + * @see Workflow#await(java.util.function.Supplier) for a blocking version */ public static Promise await(java.util.function.Supplier unblockCondition) { return WorkflowInternal.awaitAsync(unblockCondition); @@ -258,6 +259,8 @@ public static Promise await(java.util.function.Supplier unblockCo *
  • false if the timeout expired before the condition was satisfied *
  • exceptionally with CanceledFailure if the enclosing CancellationScope is canceled * + * + * @see Workflow#await(Duration, java.util.function.Supplier) for a blocking version */ public static Promise await( Duration timeout, java.util.function.Supplier unblockCondition) { diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 61c787757f..9be9d73d80 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -474,6 +474,7 @@ public static CancellationScope newDetachedCancellationScope(Runnable runnable) * * @return feature that becomes ready when at least specified number of seconds passes. promise is * failed with {@link CanceledFailure} if enclosing scope is canceled. + * @see #sleep(Duration) for a blocking version */ public static Promise newTimer(Duration delay) { return WorkflowInternal.newTimer(delay); @@ -485,6 +486,7 @@ public static Promise newTimer(Duration delay) { * * @return feature that becomes ready when at least specified number of seconds passes. promise is * failed with {@link CanceledFailure} if enclosing scope is canceled. + * @see #sleep(Duration) for a blocking version */ public static Promise newTimer(Duration delay, TimerOptions options) { return WorkflowInternal.newTimer(delay, options); @@ -566,12 +568,20 @@ public static long currentTimeMillis() { return WorkflowInternal.currentTimeMillis(); } - /** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */ + /** + * Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. + * + * @see #newTimer(Duration) for a non-blocking version that returns a Promise + */ public static void sleep(Duration duration) { WorkflowInternal.sleep(duration); } - /** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */ + /** + * Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. + * + * @see #newTimer(Duration) for a non-blocking version that returns a Promise + */ public static void sleep(long millis) { WorkflowInternal.sleep(Duration.ofMillis(millis)); } @@ -585,6 +595,7 @@ public static void sleep(long millis) { * contain any time based conditions. Use {@link #await(Duration, Supplier)} for those * instead. * @throws CanceledFailure if thread (or current {@link CancellationScope} was canceled). + * @see Async#await(java.util.function.Supplier) for a non-blocking version that returns a Promise */ public static void await(Supplier unblockCondition) { WorkflowInternal.await( @@ -606,6 +617,8 @@ public static void await(Supplier unblockCondition) { * Use timeout parameter for those. * @return false if timed out. * @throws CanceledFailure if thread (or current {@link CancellationScope} was canceled). + * @see Async#await(Duration, java.util.function.Supplier) for a non-blocking version that returns + * a Promise */ public static boolean await(Duration timeout, Supplier unblockCondition) { return WorkflowInternal.await(