Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,26 @@ public DynamicUpdateHandler getHandler() {

void await(String reason, Supplier<Boolean> unblockCondition);

/**
* Asynchronously wait until unblockCondition evaluates to true.
*
* @param unblockCondition condition that should return true to indicate completion
* @return Promise that completes when the condition becomes true, or completes exceptionally with
* CanceledFailure if the enclosing CancellationScope is canceled
*/
Promise<Void> awaitAsync(Supplier<Boolean> unblockCondition);

/**
* Asynchronously wait until unblockCondition evaluates to true or timeout expires.
*
* @param timeout maximum time to wait for the condition
* @param unblockCondition condition that should return true to indicate completion
* @return Promise that completes with true if the condition was satisfied, false if the timeout
* expired, or exceptionally with CanceledFailure if the enclosing CancellationScope is
* canceled
*/
Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> unblockCondition);

Promise<Void> newTimer(Duration duration);

Promise<Void> newTimer(Duration duration, TimerOptions options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ public void await(String reason, Supplier<Boolean> unblockCondition) {
next.await(reason, unblockCondition);
}

@Override
public Promise<Void> awaitAsync(Supplier<Boolean> unblockCondition) {
return next.awaitAsync(unblockCondition);
}

@Override
public Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> unblockCondition) {
return next.awaitAsync(timeout, unblockCondition);
}

@Override
public Promise<Void> newTimer(Duration duration) {
return next.newTimer(duration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.workflow.CancellationScope;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -29,7 +30,7 @@ static DeterministicRunner newRunner(
SyncWorkflowContext workflowContext,
Runnable root,
WorkflowExecutorCache cache) {
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache);
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache, null);
}

/**
Expand All @@ -44,7 +45,29 @@ static DeterministicRunner newRunner(
WorkflowThreadExecutor workflowThreadExecutor,
SyncWorkflowContext workflowContext,
Runnable root) {
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null);
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null, null);
}

/**
* Create new instance of DeterministicRunner with a callback invoked before threads wake up.
*
* @param workflowThreadExecutor executor for workflow thread Runnables
* @param workflowContext workflow context to use
* @param root function that root thread of the runner executes.
* @param cache WorkflowExecutorCache used cache inflight workflows
* @param beforeThreadsWakeUp callback invoked once per loop iteration before threads run. Returns
* true if progress was made (e.g., a condition watcher fired), which causes the loop to
* continue even if all threads are blocked. Returns false if no progress was made.
* @return instance of the DeterministicRunner.
*/
static DeterministicRunner newRunner(
WorkflowThreadExecutor workflowThreadExecutor,
SyncWorkflowContext workflowContext,
Runnable root,
WorkflowExecutorCache cache,
@Nullable Supplier<Boolean> beforeThreadsWakeUp) {
return new DeterministicRunnerImpl(
workflowThreadExecutor, workflowContext, root, cache, beforeThreadsWakeUp);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class DeterministicRunnerImpl implements DeterministicRunner {
// always accessed under the runner lock
private final List<NamedRunnable> toExecuteInWorkflowThread = new ArrayList<>();

// Callback invoked before threads wake up in each event loop iteration
@Nullable private final Supplier<Boolean> beforeThreadsWakeUp;

// Access to workflowThreadsToAdd, callbackThreadsToAdd, addedThreads doesn't have to be
// synchronized.
// Inside DeterministicRunner the access to these variables is under the runner lock.
Expand Down Expand Up @@ -144,20 +147,30 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
WorkflowThreadExecutor workflowThreadExecutor,
@Nonnull SyncWorkflowContext workflowContext,
Runnable root) {
this(workflowThreadExecutor, workflowContext, root, null);
this(workflowThreadExecutor, workflowContext, root, null, null);
}

DeterministicRunnerImpl(
WorkflowThreadExecutor workflowThreadExecutor,
@Nonnull SyncWorkflowContext workflowContext,
Runnable root,
WorkflowExecutorCache cache) {
this(workflowThreadExecutor, workflowContext, root, cache, null);
}

DeterministicRunnerImpl(
WorkflowThreadExecutor workflowThreadExecutor,
@Nonnull SyncWorkflowContext workflowContext,
Runnable root,
WorkflowExecutorCache cache,
@Nullable Supplier<Boolean> beforeThreadsWakeUp) {
this.workflowThreadExecutor = workflowThreadExecutor;
this.workflowContext = Preconditions.checkNotNull(workflowContext, "workflowContext");
// TODO this should be refactored, publishing of this in an constructor into external objects is
// a bad practice
this.workflowContext.setRunner(this);
this.cache = cache;
this.beforeThreadsWakeUp = beforeThreadsWakeUp;
boolean deterministicCancellationScopeOrder =
workflowContext
.getReplayContext()
Expand Down Expand Up @@ -208,7 +221,16 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) {
appendCallbackThreadsLocked();
}
toExecuteInWorkflowThread.clear();
progress = false;

// Invoke beforeThreadsWakeUp callback BEFORE running threads.
// This allows the callback to evaluate conditions and complete promises,
// ensuring threads see updated state when they wake up.
if (beforeThreadsWakeUp != null) {
progress = beforeThreadsWakeUp.get();
} else {
progress = false;
}

Iterator<WorkflowThread> ci = threads.iterator();
while (ci.hasNext()) {
WorkflowThread c = ci.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) {
context.getWorkflowExecution()))
.start();
},
cache);
cache,
workflowContext.getBeforeThreadsWakeUpCallback());
}

@Override
Expand Down
Loading
Loading