Skip to content

Commit 95feef7

Browse files
committed
Allow infinite retries for workflows
Signed-off-by: Matheus André <92062874+matheusandre1@users.noreply.github.com>
1 parent fa4a291 commit 95feef7

File tree

5 files changed

+96
-8
lines changed

5 files changed

+96
-8
lines changed

examples/workflow/simple.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
child_orchestrator_count = 0
3232
child_orchestrator_string = ''
3333
child_act_retry_count = 0
34+
infinite_retry_count = 0
3435
instance_id = 'exampleInstanceID'
3536
child_instance_id = 'childInstanceID'
3637
workflow_name = 'hello_world_wf'
@@ -48,6 +49,11 @@
4849
retry_timeout=timedelta(seconds=100),
4950
)
5051

52+
infinite_retry_policy = RetryPolicy(
53+
first_retry_interval=timedelta(seconds=1),
54+
max_number_of_attempts=-1,
55+
)
56+
5157
wfr = WorkflowRuntime()
5258

5359

@@ -57,6 +63,7 @@ def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
5763
yield ctx.call_activity(hello_act, input=1)
5864
yield ctx.call_activity(hello_act, input=10)
5965
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
66+
yield ctx.call_activity(hello_infinite_retryable_act, retry_policy=infinite_retry_policy)
6067
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
6168

6269
# Change in event handling: Use when_any to handle both event and timeout
@@ -91,6 +98,15 @@ def hello_retryable_act(ctx: WorkflowActivityContext):
9198
retry_count += 1
9299

93100

101+
@wfr.activity(name='hello_infinite_retryable_act')
102+
def hello_infinite_retryable_act(ctx: WorkflowActivityContext):
103+
global infinite_retry_count
104+
infinite_retry_count += 1
105+
print(f'Infinite retry attempt: {infinite_retry_count}', flush=True)
106+
if infinite_retry_count <= 10:
107+
raise ValueError('Retryable Error')
108+
109+
94110
@wfr.workflow(name='child_retryable_wf')
95111
def child_retryable_wf(ctx: DaprWorkflowContext):
96112
global child_orchestrator_string, child_orchestrator_count
@@ -128,11 +144,12 @@ def main():
128144

129145
wf_client.wait_for_workflow_start(instance_id)
130146

131-
# Sleep to let the workflow run initial activities
132-
sleep(12)
147+
# Sleep to let the workflow run initial activities and infinite retries
148+
sleep(24)
133149

134150
assert counter == 11
135151
assert retry_count == 2
152+
assert infinite_retry_count == 11
136153
assert child_orchestrator_string == '1aa2bb3cc'
137154

138155
# Pause Test
@@ -153,10 +170,13 @@ def main():
153170
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
154171
if state.runtime_status.name == 'COMPLETED':
155172
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
173+
assert state.serialized_output.strip('"') == 'Completed'
156174
else:
157175
print(f'Workflow failed! Status: {state.runtime_status.name}')
176+
raise AssertionError(f'Unexpected workflow status: {state.runtime_status.name}')
158177
except TimeoutError:
159178
print('*** Workflow timed out!')
179+
raise
160180

161181
wf_client.purge_workflow(instance_id=instance_id)
162182
try:

ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(
4040
Args:
4141
first_retry_interval(timedelta): The retry interval to use for the first retry attempt.
4242
max_number_of_attempts(int): The maximum number of retry attempts.
43+
Use ``-1`` for infinite retries.
4344
backoff_coefficient(Optional[float]): The backoff coefficient to use for calculating
4445
the next retry interval.
4546
max_retry_interval(Optional[timedelta]): The maximum retry interval to use for any
@@ -50,8 +51,8 @@ def __init__(
5051
# validate inputs
5152
if first_retry_interval < timedelta(seconds=0):
5253
raise ValueError('first_retry_interval must be >= 0')
53-
if max_number_of_attempts < 1:
54-
raise ValueError('max_number_of_attempts must be >= 1')
54+
if max_number_of_attempts == 0 or max_number_of_attempts < -1:
55+
raise ValueError('max_number_of_attempts must be >= 1 or -1 for infinite retries')
5556
if backoff_coefficient is not None and backoff_coefficient < 1:
5657
raise ValueError('backoff_coefficient must be >= 1')
5758
if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0):

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,10 @@ def start(self):
246246
try:
247247
is_ready = self.wait_for_worker_ready(timeout=self._worker_ready_timeout)
248248
if not is_ready:
249-
raise RuntimeError('WorkflowRuntime worker and its stream are not ready')
249+
self._logger.warning(
250+
'WorkflowRuntime worker and its stream are not ready. '
251+
'Continuing; workflows scheduled immediately may not be received.'
252+
)
250253
else:
251254
self._logger.debug(
252255
'WorkflowRuntime worker is ready and its stream can receive work items'
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2026 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
import unittest
17+
from datetime import timedelta
18+
19+
from dapr.ext.workflow.retry_policy import RetryPolicy
20+
21+
22+
class RetryPolicyTests(unittest.TestCase):
23+
@staticmethod
24+
def _execute_with_retry(operation, retry_policy: RetryPolicy):
25+
attempts = 0
26+
while True:
27+
attempts += 1
28+
try:
29+
operation(attempts)
30+
return attempts
31+
except ValueError:
32+
if (
33+
retry_policy.max_number_of_attempts != -1
34+
and attempts >= retry_policy.max_number_of_attempts
35+
):
36+
raise
37+
38+
def test_allow_infinite_max_number_of_attempts(self):
39+
retry_policy = RetryPolicy(
40+
first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1
41+
)
42+
43+
self.assertEqual(-1, retry_policy.max_number_of_attempts)
44+
45+
def test_infinite_retries_succeeds_after_ten_failures(self):
46+
retry_policy = RetryPolicy(
47+
first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1
48+
)
49+
50+
def flaky_operation(attempt: int):
51+
if attempt <= 10:
52+
raise ValueError('Retryable Error')
53+
54+
attempts = self._execute_with_retry(flaky_operation, retry_policy=retry_policy)
55+
56+
self.assertEqual(11, attempts)
57+
58+
def test_reject_invalid_max_number_of_attempts(self):
59+
with self.assertRaises(ValueError):
60+
RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=0)
61+
62+
with self.assertRaises(ValueError):
63+
RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-2)

ext/dapr-ext-workflow/tests/test_workflow_runtime.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,17 +246,18 @@ def test_start_logs_exception_when_worker_start_fails(self):
246246
mock_exception.assert_called_once()
247247
self.assertIn('did not start', mock_exception.call_args[0][0])
248248

249-
def test_start_raises_when_worker_not_ready(self):
249+
def test_start_logs_warning_when_worker_not_ready(self):
250250
listActivities.clear()
251251
listOrchestrators.clear()
252252
mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start()
253253
runtime = WorkflowRuntime(worker_ready_timeout=0.2)
254254
mock_worker = mock.MagicMock()
255255
mock_worker.is_worker_ready.return_value = False
256256
runtime._WorkflowRuntime__worker = mock_worker
257-
with self.assertRaises(RuntimeError) as ctx:
257+
with mock.patch.object(runtime._logger, 'warning') as mock_warning:
258258
runtime.start()
259-
self.assertIn('not ready', str(ctx.exception))
259+
mock_worker.start.assert_called_once()
260+
self.assertGreaterEqual(mock_warning.call_count, 1)
260261

261262
def test_start_logs_warning_when_no_is_worker_ready(self):
262263
mock_worker = mock.MagicMock(spec=['start', 'stop', '_registry'])

0 commit comments

Comments
 (0)