1313
1414package io .dapr .durabletask ;
1515
16+ import com .google .protobuf .Empty ;
1617import io .dapr .durabletask .implementation .protobuf .OrchestratorService ;
1718import io .dapr .durabletask .implementation .protobuf .TaskHubSidecarServiceGrpc ;
1819import io .dapr .durabletask .orchestration .TaskOrchestrationFactories ;
3536import java .util .Map ;
3637import java .util .concurrent .ExecutorService ;
3738import java .util .concurrent .Executors ;
39+ import java .util .concurrent .ScheduledExecutorService ;
3840import java .util .concurrent .TimeUnit ;
3941import java .util .logging .Level ;
4042import java .util .logging .Logger ;
@@ -48,6 +50,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
4850 private static final int DEFAULT_PORT = 4001 ;
4951 private static final Logger logger = Logger .getLogger (DurableTaskGrpcWorker .class .getPackage ().getName ());
5052 private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration .ofDays (3 );
53+ private static final long KEEPALIVE_INTERVAL_SECONDS = 30 ;
5154
5255 private final TaskOrchestrationFactories orchestrationFactories ;
5356
@@ -63,6 +66,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
6366 private final TaskHubSidecarServiceGrpc .TaskHubSidecarServiceBlockingStub sidecarClient ;
6467 private final boolean isExecutorServiceManaged ;
6568 private volatile boolean isNormalShutdown = false ;
69+ private ScheduledExecutorService keepaliveScheduler ;
6670 private Thread workerThread ;
6771
6872 DurableTaskGrpcWorker (DurableTaskGrpcWorkerBuilder builder ) {
@@ -136,6 +140,7 @@ public void close() {
136140 this .workerThread .interrupt ();
137141 }
138142 this .isNormalShutdown = true ;
143+ this .stopKeepaliveLoop ();
139144 this .shutDownWorkerPool ();
140145 this .closeSideCarChannel ();
141146 }
@@ -175,33 +180,38 @@ public void startAndBlock() {
175180 OrchestratorService .GetWorkItemsRequest getWorkItemsRequest = OrchestratorService .GetWorkItemsRequest
176181 .newBuilder ().build ();
177182 Iterator <OrchestratorService .WorkItem > workItemStream = this .sidecarClient .getWorkItems (getWorkItemsRequest );
178- while (workItemStream .hasNext ()) {
179- OrchestratorService .WorkItem workItem = workItemStream .next ();
180- OrchestratorService .WorkItem .RequestCase requestType = workItem .getRequestCase ();
181-
182- if (requestType == OrchestratorService .WorkItem .RequestCase .ORCHESTRATORREQUEST ) {
183- OrchestratorService .OrchestratorRequest orchestratorRequest = workItem .getOrchestratorRequest ();
184- logger .log (Level .FINEST ,
185- String .format ("Processing orchestrator request for instance: {0}" ,
186- orchestratorRequest .getInstanceId ()));
187-
188- this .workerPool .submit (new OrchestratorRunner (workItem , taskOrchestrationExecutor , sidecarClient , tracer ));
189- } else if (requestType == OrchestratorService .WorkItem .RequestCase .ACTIVITYREQUEST ) {
190- OrchestratorService .ActivityRequest activityRequest = workItem .getActivityRequest ();
191-
192- logger .log (Level .INFO ,
193- String .format ("Processing activity request: %s for instance: %s, gRPC thread context: %s" ,
194- activityRequest .getName (),
195- activityRequest .getOrchestrationInstance ().getInstanceId (),
196- Context .current ()));
197-
198- this .workerPool .submit (new ActivityRunner (workItem , taskActivityExecutor , sidecarClient , tracer ));
199-
200- } else {
201- logger .log (Level .WARNING ,
202- "Received and dropped an unknown '{0}' work-item from the sidecar." ,
203- requestType );
183+ startKeepaliveLoop ();
184+ try {
185+ while (workItemStream .hasNext ()) {
186+ OrchestratorService .WorkItem workItem = workItemStream .next ();
187+ OrchestratorService .WorkItem .RequestCase requestType = workItem .getRequestCase ();
188+
189+ if (requestType == OrchestratorService .WorkItem .RequestCase .ORCHESTRATORREQUEST ) {
190+ OrchestratorService .OrchestratorRequest orchestratorRequest = workItem .getOrchestratorRequest ();
191+ logger .log (Level .FINEST ,
192+ String .format ("Processing orchestrator request for instance: {0}" ,
193+ orchestratorRequest .getInstanceId ()));
194+
195+ this .workerPool .submit (new OrchestratorRunner (workItem , taskOrchestrationExecutor , sidecarClient , tracer ));
196+ } else if (requestType == OrchestratorService .WorkItem .RequestCase .ACTIVITYREQUEST ) {
197+ OrchestratorService .ActivityRequest activityRequest = workItem .getActivityRequest ();
198+
199+ logger .log (Level .INFO ,
200+ String .format ("Processing activity request: %s for instance: %s, gRPC thread context: %s" ,
201+ activityRequest .getName (),
202+ activityRequest .getOrchestrationInstance ().getInstanceId (),
203+ Context .current ()));
204+
205+ this .workerPool .submit (new ActivityRunner (workItem , taskActivityExecutor , sidecarClient , tracer ));
206+
207+ } else {
208+ logger .log (Level .WARNING ,
209+ "Received and dropped an unknown '{0}' work-item from the sidecar." ,
210+ requestType );
211+ }
204212 }
213+ } finally {
214+ stopKeepaliveLoop ();
205215 }
206216 } catch (StatusRuntimeException e ) {
207217 if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
@@ -262,6 +272,46 @@ private void shutDownWorkerPool() {
262272 }
263273 }
264274
275+ /**
276+ * Starts a background keepalive loop to keep the gRPC connection alive.
277+ * This is an application-level keepalive to prevent AWS ALBs from
278+ * killing idle HTTP/2 connections.
279+ */
280+ private synchronized void startKeepaliveLoop () {
281+ stopKeepaliveLoop ();
282+ ScheduledExecutorService scheduler = Executors .newSingleThreadScheduledExecutor (r -> {
283+ Thread t = new Thread (r , "durabletask-keepalive" );
284+ t .setDaemon (true );
285+ return t ;
286+ });
287+ try {
288+ scheduler .scheduleWithFixedDelay (() -> {
289+ try {
290+ this .sidecarClient
291+ .withDeadlineAfter (5 , TimeUnit .SECONDS )
292+ .hello (Empty .getDefaultInstance ());
293+ } catch (StatusRuntimeException e ) {
294+ logger .log (Level .FINE , "keepalive failed" , e );
295+ }
296+ }, KEEPALIVE_INTERVAL_SECONDS , KEEPALIVE_INTERVAL_SECONDS , TimeUnit .SECONDS );
297+ } catch (RuntimeException e ) {
298+ scheduler .shutdownNow ();
299+ throw e ;
300+ }
301+ this .keepaliveScheduler = scheduler ;
302+ }
303+
304+ /**
305+ * Stops the background keepalive loop if one is running.
306+ */
307+ private synchronized void stopKeepaliveLoop () {
308+ ScheduledExecutorService scheduler = this .keepaliveScheduler ;
309+ this .keepaliveScheduler = null ;
310+ if (scheduler != null ) {
311+ scheduler .shutdownNow ();
312+ }
313+ }
314+
265315 private String getSidecarAddress () {
266316 return this .sidecarClient .getChannel ().authority ();
267317 }
0 commit comments