From 307bbfd979d0a615b437784aeeb0441041860284 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Tue, 14 Apr 2026 11:18:20 +0200 Subject: [PATCH 1/3] fix: detect shutdown and break DurableTaskGrpcWorker loop The worker loop ran `while (true)` and only exited if an InterruptedException happened during the 5-second retry sleep. If `close()` was called while the gRPC stream was blocking, the CANCELLED exception was logged but the loop kept retrying. - Replace `while (true)` with a check on `isNormalShutdown` and the thread interrupt flag so the loop exits promptly. - Break out of the retry path on CANCELLED when `isNormalShutdown` is set, avoiding a misleading 5-second sleep after `close()`. - Re-set the interrupt flag before breaking on InterruptedException to preserve the interrupt contract for callers higher up. Signed-off-by: Javier Aliaga --- .../java/io/dapr/durabletask/DurableTaskGrpcWorker.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 12f89bd727..d4db1a4cc6 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -171,7 +171,7 @@ public void startAndBlock() { this.dataConverter, logger); - while (true) { + while (!this.isNormalShutdown && !Thread.currentThread().isInterrupted()) { try { OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest .newBuilder().build(); @@ -210,6 +210,9 @@ public void startAndBlock() { this.getSidecarAddress()); } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress()); + if (this.isNormalShutdown) { + break; + } } else { logger.log(Level.WARNING, String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e); @@ -219,6 +222,7 @@ public void startAndBlock() { try { Thread.sleep(5000); } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); break; } } From e00a33e2cc6e67377278df1848bf11f6739f0852 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Tue, 14 Apr 2026 12:27:09 +0200 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20address=20PR=20review=20=E2=80=94=20?= =?UTF-8?q?skip=20retry=20sleep=20on=20shutdown,=20add=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Capture workerThread in startAndBlock() so close() can interrupt the thread even when startAndBlock() is called directly (not via start()), fixing the case where the 5s sleep blocks shutdown. - Add isNormalShutdown guard before the retry sleep so any exception code (UNAVAILABLE, CANCELLED, etc.) exits promptly during shutdown. - Add DurableTaskGrpcWorkerShutdownTest with 3 scenarios: - start() + close() terminates the worker thread promptly - startAndBlock() on a separate thread exits on close() - startAndBlock() exits on thread interrupt Signed-off-by: Javier Aliaga --- .../durabletask/DurableTaskGrpcWorker.java | 5 + .../DurableTaskGrpcWorkerShutdownTest.java | 128 ++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index d4db1a4cc6..2ebc7de476 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -158,6 +158,7 @@ public void close() { * interrupt signal.

*/ public void startAndBlock() { + this.workerThread = Thread.currentThread(); logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress()); TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor( @@ -218,6 +219,10 @@ public void startAndBlock() { String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e); } + if (this.isNormalShutdown) { + break; + } + // Retry after 5 seconds try { Thread.sleep(5000); diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java new file mode 100644 index 0000000000..aa7a45606f --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java @@ -0,0 +1,128 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for DurableTaskGrpcWorker shutdown behavior. + */ +public class DurableTaskGrpcWorkerShutdownTest { + + /** + * Verifies that calling close() on a worker that was started via start() + * causes the worker thread to terminate promptly (within a bounded time), + * rather than hanging in the retry loop. + */ + @Test + void workerThreadTerminatesPromptlyOnClose() throws Exception { + // Use an arbitrary port where no sidecar is running — the worker will + // enter the retry loop (UNAVAILABLE → sleep 5s → retry). + DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + .port(19876) + .build(); + + worker.start(); + + // Give the worker thread time to enter the retry loop + Thread.sleep(500); + + Instant before = Instant.now(); + worker.close(); + + // Wait for the worker thread to finish — the join is bounded so the + // test doesn't hang if the fix regresses. + Thread workerThread = getWorkerThread(worker); + if (workerThread != null) { + workerThread.join(Duration.ofSeconds(3).toMillis()); + assertFalse(workerThread.isAlive(), + "Worker thread should have terminated after close()"); + } + + Duration elapsed = Duration.between(before, Instant.now()); + assertTrue(elapsed.toMillis() < 3000, + "close() should return promptly, but took " + elapsed.toMillis() + "ms"); + } + + /** + * Verifies that calling close() on a worker that was started via + * startAndBlock() on a separate thread terminates that thread promptly. + */ + @Test + void startAndBlockExitsOnClose() throws Exception { + DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + .port(19877) + .build(); + + Thread blockingThread = new Thread(worker::startAndBlock); + blockingThread.start(); + + // Give the blocking thread time to enter the retry loop + Thread.sleep(500); + + Instant before = Instant.now(); + worker.close(); + + blockingThread.join(Duration.ofSeconds(3).toMillis()); + assertFalse(blockingThread.isAlive(), + "startAndBlock() thread should have terminated after close()"); + + Duration elapsed = Duration.between(before, Instant.now()); + assertTrue(elapsed.toMillis() < 3000, + "close() should terminate startAndBlock() promptly, but took " + elapsed.toMillis() + "ms"); + } + + /** + * Verifies that interrupting the thread running startAndBlock() causes it + * to exit and preserves the interrupt status. + */ + @Test + void startAndBlockExitsOnInterrupt() throws Exception { + DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + .port(19878) + .build(); + + Thread blockingThread = new Thread(worker::startAndBlock); + blockingThread.start(); + + // Give the blocking thread time to enter the retry loop + Thread.sleep(500); + + blockingThread.interrupt(); + blockingThread.join(Duration.ofSeconds(3).toMillis()); + + assertFalse(blockingThread.isAlive(), + "startAndBlock() thread should have exited after interrupt"); + + worker.close(); + } + + private Thread getWorkerThread(DurableTaskGrpcWorker worker) { + try { + java.lang.reflect.Field f = DurableTaskGrpcWorker.class.getDeclaredField("workerThread"); + f.setAccessible(true); + return (Thread) f.get(worker); + } catch (Exception e) { + return null; + } + } +} From 04b7a117759101ad05b0a385e3ceac1393f55e36 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Tue, 14 Apr 2026 15:33:47 +0200 Subject: [PATCH 3/3] fix: address second round of PR review comments - Mark workerThread as volatile for cross-thread visibility - Remove unused imports (ManagedChannel, ManagedChannelBuilder) - Fail test explicitly when reflection fails instead of silently returning null - Assert interrupt status is preserved in startAndBlockExitsOnInterrupt Signed-off-by: Javier Aliaga --- .../durabletask/DurableTaskGrpcWorker.java | 8 ++++---- .../DurableTaskGrpcWorkerShutdownTest.java | 18 ++++++++++-------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 2ebc7de476..89fe85c8af 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -64,7 +64,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; private volatile boolean isNormalShutdown = false; - private Thread workerThread; + private volatile Thread workerThread; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { this.orchestrationFactories = builder.orchestrationFactories; @@ -178,6 +178,9 @@ public void startAndBlock() { .newBuilder().build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); while (workItemStream.hasNext()) { + if (this.isNormalShutdown || Thread.currentThread().isInterrupted()) { + break; + } OrchestratorService.WorkItem workItem = workItemStream.next(); OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase(); @@ -211,9 +214,6 @@ public void startAndBlock() { this.getSidecarAddress()); } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress()); - if (this.isNormalShutdown) { - break; - } } else { logger.log(Level.WARNING, String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e); diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java index aa7a45606f..71368af4be 100644 --- a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java +++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java @@ -13,15 +13,15 @@ package io.dapr.durabletask; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import org.junit.jupiter.api.Test; import java.time.Duration; import java.time.Instant; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Unit tests for DurableTaskGrpcWorker shutdown behavior. @@ -52,11 +52,10 @@ void workerThreadTerminatesPromptlyOnClose() throws Exception { // Wait for the worker thread to finish — the join is bounded so the // test doesn't hang if the fix regresses. Thread workerThread = getWorkerThread(worker); - if (workerThread != null) { - workerThread.join(Duration.ofSeconds(3).toMillis()); - assertFalse(workerThread.isAlive(), - "Worker thread should have terminated after close()"); - } + assertNotNull(workerThread, "Worker thread should be accessible via reflection"); + workerThread.join(Duration.ofSeconds(3).toMillis()); + assertFalse(workerThread.isAlive(), + "Worker thread should have terminated after close()"); Duration elapsed = Duration.between(before, Instant.now()); assertTrue(elapsed.toMillis() < 3000, @@ -112,6 +111,8 @@ void startAndBlockExitsOnInterrupt() throws Exception { assertFalse(blockingThread.isAlive(), "startAndBlock() thread should have exited after interrupt"); + assertTrue(blockingThread.isInterrupted(), + "Interrupt status should be preserved after startAndBlock() exits"); worker.close(); } @@ -122,7 +123,8 @@ private Thread getWorkerThread(DurableTaskGrpcWorker worker) { f.setAccessible(true); return (Thread) f.get(worker); } catch (Exception e) { - return null; + fail("Failed to access workerThread field via reflection: " + e.getMessage()); + return null; // unreachable } } }