Skip to content

Commit 079d6e1

Browse files
committed
Implement work item filtering
1 parent 60ecc95 commit 079d6e1

File tree

10 files changed

+1311
-15
lines changed

10 files changed

+1311
-15
lines changed

.vscode/mcp.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"inputs": [
3+
{
4+
"id": "ado_org",
5+
"type": "promptString",
6+
"description": "msazure"
7+
}
8+
],
9+
"servers": {
10+
"ado": {
11+
"type": "stdio",
12+
"command": "npx",
13+
"args": ["-y", "@azure-devops/mcp", "msazure"]
14+
}
15+
}
16+
}

docs/features.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,3 +166,69 @@ with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_cha
166166

167167
**NOTE**
168168
The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled.
169+
170+
### Work item filtering
171+
172+
By default a worker receives **all** work items from the backend,
173+
regardless of which orchestrations, activities, or entities are
174+
registered. Work item filtering lets you explicitly tell the backend
175+
which work items a worker can handle so that only matching items are
176+
dispatched. This is useful when running multiple specialized workers
177+
against the same task hub.
178+
179+
Work item filtering is **opt-in**. Call `use_work_item_filters()` on
180+
the worker before starting it.
181+
182+
#### Auto-generated filters
183+
184+
Calling `use_work_item_filters()` with no arguments builds filters
185+
automatically from the worker's registry at start time:
186+
187+
```python
188+
with DurableTaskSchedulerWorker(...) as w:
189+
w.add_orchestrator(my_orchestrator)
190+
w.add_activity(my_activity)
191+
w.use_work_item_filters() # auto-generate from registry
192+
w.start()
193+
```
194+
195+
When versioning is configured with `VersionMatchStrategy.STRICT`,
196+
the worker's version is included in every filter so the backend
197+
only dispatches work items that match that exact version.
198+
199+
#### Explicit filters
200+
201+
Pass a `WorkItemFilters` instance for fine-grained control:
202+
203+
```python
204+
from durabletask.worker import (
205+
WorkItemFilters,
206+
OrchestrationWorkItemFilter,
207+
ActivityWorkItemFilter,
208+
EntityWorkItemFilter,
209+
)
210+
211+
w.use_work_item_filters(WorkItemFilters(
212+
orchestrations=[
213+
OrchestrationWorkItemFilter(name="my_orch", versions=["2.0.0"]),
214+
],
215+
activities=[
216+
ActivityWorkItemFilter(name="my_activity"),
217+
],
218+
entities=[
219+
EntityWorkItemFilter(name="my_entity"),
220+
],
221+
))
222+
```
223+
224+
#### Clearing filters
225+
226+
Pass `None` to clear any previously configured filters and return
227+
to the default behaviour of processing all work items:
228+
229+
```python
230+
w.use_work_item_filters(None)
231+
```
232+
233+
See the full
234+
[work item filtering sample](../examples/work_item_filtering.py).

docs/supported-patterns.md

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,49 @@ def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
118118
return "Success"
119119
```
120120

121-
See the full [version-aware orchestrator sample](../examples/version_aware_orchestrator.py)
121+
See the full [version-aware orchestrator sample](../examples/version_aware_orchestrator.py)
122+
123+
### Work item filtering
124+
125+
When running multiple workers against the same task hub, each
126+
worker can declare which work items it handles. The backend then
127+
dispatches only the matching orchestrations, activities, and
128+
entities, avoiding unnecessary round-trips. Filtering is opt-in
129+
and supports both auto-generated and explicit filter sets.
130+
131+
The simplest approach auto-generates filters from the worker's
132+
registry:
133+
134+
```python
135+
with DurableTaskSchedulerWorker(...) as w:
136+
w.add_orchestrator(greeting_orchestrator)
137+
w.add_activity(greet)
138+
w.use_work_item_filters() # auto-generate from registry
139+
w.start()
140+
```
141+
142+
For more control you can provide explicit filters, including
143+
version constraints:
144+
145+
```python
146+
from durabletask.worker import (
147+
WorkItemFilters,
148+
OrchestrationWorkItemFilter,
149+
ActivityWorkItemFilter,
150+
)
151+
152+
w.use_work_item_filters(WorkItemFilters(
153+
orchestrations=[
154+
OrchestrationWorkItemFilter(
155+
name="greeting_orchestrator",
156+
versions=["2.0.0"],
157+
),
158+
],
159+
activities=[
160+
ActivityWorkItemFilter(name="greet"),
161+
],
162+
))
163+
```
164+
165+
See the full
166+
[work item filtering sample](../examples/work_item_filtering.py).

durabletask/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
"""Durable Task SDK for Python"""
55

6-
from durabletask.worker import ConcurrencyOptions, VersioningOptions
6+
from durabletask.worker import ConcurrencyOptions, VersioningOptions, WorkItemFilters
77

8-
__all__ = ["ConcurrencyOptions", "VersioningOptions"]
8+
__all__ = ["ConcurrencyOptions", "VersioningOptions", "WorkItemFilters"]
99

1010
PACKAGE_NAME = "durabletask"

durabletask/testing/in_memory_backend.py

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import durabletask.internal.orchestrator_service_pb2 as pb
2727
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
2828
import durabletask.internal.helpers as helpers
29+
from durabletask.entities.entity_instance_id import EntityInstanceId
2930

3031

3132
@dataclass
@@ -436,16 +437,29 @@ def RestartInstance(self, request: pb.RestartInstanceRequest, context):
436437
f"Restarted instance '{request.instanceId}' as '{new_instance_id}'")
437438
return pb.RestartInstanceResponse(instanceId=new_instance_id)
438439

440+
@staticmethod
441+
def _parse_work_item_filters(request: pb.GetWorkItemsRequest):
442+
"""Extract filter name sets from the request, or None if unfiltered."""
443+
if not request.HasField("workItemFilters"):
444+
return None, None, None
445+
wf = request.workItemFilters
446+
orch_names = {f.name for f in wf.orchestrations} if wf.orchestrations else None
447+
activity_names = {f.name for f in wf.activities} if wf.activities else None
448+
entity_names = {f.name for f in wf.entities} if wf.entities else None
449+
return orch_names, activity_names, entity_names
450+
439451
def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
440452
"""Streams work items to the worker (orchestration and activity work items)."""
441453
self._logger.info("Worker connected and requesting work items")
454+
orch_filter, activity_filter, entity_filter = self._parse_work_item_filters(request)
442455

443456
try:
444457
while context.is_active() and not self._shutdown_event.is_set():
445458
work_item = None
446459

447460
with self._lock:
448461
# Check for orchestration work
462+
skipped_orchs: list[str] = []
449463
while self._orchestration_queue:
450464
instance_id = self._orchestration_queue.popleft()
451465
self._orchestration_queue_set.discard(instance_id)
@@ -454,11 +468,14 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
454468
if not instance or not instance.pending_events:
455469
continue
456470

471+
# Skip if orchestration name doesn't match filters
472+
if orch_filter is not None and instance.name not in orch_filter:
473+
skipped_orchs.append(instance_id)
474+
continue
475+
457476
if instance_id in self._orchestration_in_flight:
458477
# Already being processed — re-add to queue
459-
if instance_id not in self._orchestration_queue_set:
460-
self._orchestration_queue.append(instance_id)
461-
self._orchestration_queue_set.add(instance_id)
478+
skipped_orchs.append(instance_id)
462479
break
463480

464481
# Move pending events to dispatched_events
@@ -485,27 +502,58 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
485502
)
486503
break
487504

505+
# Re-queue skipped orchestrations for other workers
506+
for s in skipped_orchs:
507+
if s not in self._orchestration_queue_set:
508+
self._orchestration_queue.append(s)
509+
self._orchestration_queue_set.add(s)
510+
488511
# Check for activity work
489512
if not work_item and self._activity_queue:
490-
activity = self._activity_queue.popleft()
491-
work_item = pb.WorkItem(
492-
completionToken=str(activity.completion_token),
493-
activityRequest=pb.ActivityRequest(
494-
name=activity.name,
495-
taskId=activity.task_id,
496-
input=wrappers_pb2.StringValue(value=activity.input) if activity.input else None,
497-
orchestrationInstance=pb.OrchestrationInstance(instanceId=activity.instance_id)
513+
# Scan for the first matching activity
514+
skipped: list = []
515+
matched_activity = None
516+
while self._activity_queue:
517+
candidate = self._activity_queue.popleft()
518+
if activity_filter is not None and candidate.name not in activity_filter:
519+
skipped.append(candidate)
520+
continue
521+
matched_activity = candidate
522+
break
523+
# Put back non-matching items
524+
for s in skipped:
525+
self._activity_queue.append(s)
526+
527+
if matched_activity is not None:
528+
work_item = pb.WorkItem(
529+
completionToken=str(matched_activity.completion_token),
530+
activityRequest=pb.ActivityRequest(
531+
name=matched_activity.name,
532+
taskId=matched_activity.task_id,
533+
input=wrappers_pb2.StringValue(value=matched_activity.input) if matched_activity.input else None,
534+
orchestrationInstance=pb.OrchestrationInstance(instanceId=matched_activity.instance_id)
535+
)
498536
)
499-
)
500537

501538
# Check for entity work
502539
if not work_item:
540+
skipped_entities: list[str] = []
503541
while self._entity_queue:
504542
entity_id = self._entity_queue.popleft()
505543
self._entity_queue_set.discard(entity_id)
506544
entity = self._entities.get(entity_id)
507545

508546
if entity and entity.pending_operations:
547+
# Skip if entity name doesn't match filters
548+
if entity_filter is not None:
549+
try:
550+
parsed = EntityInstanceId.parse(entity_id)
551+
if parsed.entity not in entity_filter:
552+
skipped_entities.append(entity_id)
553+
continue
554+
except ValueError:
555+
pass
556+
509557
# Skip if this entity is already being processed
510558
if entity_id in self._entity_in_flight:
511559
continue
@@ -532,6 +580,12 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
532580
)
533581
break
534582

583+
# Re-queue skipped entities for other workers
584+
for s in skipped_entities:
585+
if s not in self._entity_queue_set:
586+
self._entity_queue.append(s)
587+
self._entity_queue_set.add(s)
588+
535589
if work_item:
536590
yield work_item
537591
else:

0 commit comments

Comments
 (0)