diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 12f89bd727..89fe85c8af 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -64,7 +64,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; private volatile boolean isNormalShutdown = false; - private Thread workerThread; + private volatile Thread workerThread; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { this.orchestrationFactories = builder.orchestrationFactories; @@ -158,6 +158,7 @@ public void close() { * interrupt signal.

*/ public void startAndBlock() { + this.workerThread = Thread.currentThread(); logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress()); TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor( @@ -171,12 +172,15 @@ public void startAndBlock() { this.dataConverter, logger); - while (true) { + while (!this.isNormalShutdown && !Thread.currentThread().isInterrupted()) { try { OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest .newBuilder().build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); while (workItemStream.hasNext()) { + if (this.isNormalShutdown || Thread.currentThread().isInterrupted()) { + break; + } OrchestratorService.WorkItem workItem = workItemStream.next(); OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase(); @@ -215,10 +219,15 @@ public void startAndBlock() { String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e); } + if (this.isNormalShutdown) { + break; + } + // Retry after 5 seconds try { Thread.sleep(5000); } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); break; } } diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java new file mode 100644 index 0000000000..71368af4be --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java @@ -0,0 +1,130 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Unit tests for DurableTaskGrpcWorker shutdown behavior. + */ +public class DurableTaskGrpcWorkerShutdownTest { + + /** + * Verifies that calling close() on a worker that was started via start() + * causes the worker thread to terminate promptly (within a bounded time), + * rather than hanging in the retry loop. + */ + @Test + void workerThreadTerminatesPromptlyOnClose() throws Exception { + // Use an arbitrary port where no sidecar is running — the worker will + // enter the retry loop (UNAVAILABLE → sleep 5s → retry). + DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + .port(19876) + .build(); + + worker.start(); + + // Give the worker thread time to enter the retry loop + Thread.sleep(500); + + Instant before = Instant.now(); + worker.close(); + + // Wait for the worker thread to finish — the join is bounded so the + // test doesn't hang if the fix regresses. + Thread workerThread = getWorkerThread(worker); + assertNotNull(workerThread, "Worker thread should be accessible via reflection"); + workerThread.join(Duration.ofSeconds(3).toMillis()); + assertFalse(workerThread.isAlive(), + "Worker thread should have terminated after close()"); + + Duration elapsed = Duration.between(before, Instant.now()); + assertTrue(elapsed.toMillis() < 3000, + "close() should return promptly, but took " + elapsed.toMillis() + "ms"); + } + + /** + * Verifies that calling close() on a worker that was started via + * startAndBlock() on a separate thread terminates that thread promptly. + */ + @Test + void startAndBlockExitsOnClose() throws Exception { + DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + .port(19877) + .build(); + + Thread blockingThread = new Thread(worker::startAndBlock); + blockingThread.start(); + + // Give the blocking thread time to enter the retry loop + Thread.sleep(500); + + Instant before = Instant.now(); + worker.close(); + + blockingThread.join(Duration.ofSeconds(3).toMillis()); + assertFalse(blockingThread.isAlive(), + "startAndBlock() thread should have terminated after close()"); + + Duration elapsed = Duration.between(before, Instant.now()); + assertTrue(elapsed.toMillis() < 3000, + "close() should terminate startAndBlock() promptly, but took " + elapsed.toMillis() + "ms"); + } + + /** + * Verifies that interrupting the thread running startAndBlock() causes it + * to exit and preserves the interrupt status. + */ + @Test + void startAndBlockExitsOnInterrupt() throws Exception { + DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + .port(19878) + .build(); + + Thread blockingThread = new Thread(worker::startAndBlock); + blockingThread.start(); + + // Give the blocking thread time to enter the retry loop + Thread.sleep(500); + + blockingThread.interrupt(); + blockingThread.join(Duration.ofSeconds(3).toMillis()); + + assertFalse(blockingThread.isAlive(), + "startAndBlock() thread should have exited after interrupt"); + assertTrue(blockingThread.isInterrupted(), + "Interrupt status should be preserved after startAndBlock() exits"); + + worker.close(); + } + + private Thread getWorkerThread(DurableTaskGrpcWorker worker) { + try { + java.lang.reflect.Field f = DurableTaskGrpcWorker.class.getDeclaredField("workerThread"); + f.setAccessible(true); + return (Thread) f.get(worker); + } catch (Exception e) { + fail("Failed to access workerThread field via reflection: " + e.getMessage()); + return null; // unreachable + } + } +}