Skip to content

Implement work item filtering#128

Open
andystaples wants to merge 6 commits intomicrosoft:mainfrom
andystaples:andystaples/add-workitem-filtering
Open

Implement work item filtering#128
andystaples wants to merge 6 commits intomicrosoft:mainfrom
andystaples:andystaples/add-workitem-filtering

Conversation

@andystaples
Copy link
Copy Markdown
Contributor

Summary

This PR adds work item filtering to the Durable Task Python SDK, allowing workers to declare which orchestrations, activities, and entities they handle. Filters are sent to the backend via the GetWorkItemsRequest gRPC message, enabling server-side filtering so that only matching work items are dispatched to a given worker.

This is a port of the dotnet implementation, using an explicit opt-in design — filtering is disabled by default and must be enabled by calling use_work_item_filters() before starting the worker.

Changes

Core SDK (durabletask/worker.py)

  • New filter data classes: OrchestrationWorkItemFilter, ActivityWorkItemFilter, EntityWorkItemFilter, and WorkItemFilters
  • WorkItemFilters._from_registry(): Auto-generates filters from the worker's registered orchestrators, activities, and entities. Includes version constraints when VersionMatchStrategy.STRICT is configured.
  • WorkItemFilters._to_grpc(): Converts filters to protobuf WorkItemFilters message
  • TaskHubGrpcWorker.use_work_item_filters(): Public API accepting three forms:
    • No argument — auto-generate filters from registry at start time
    • WorkItemFilters instance — use explicit custom filters
    • None — clear previously set filters
  • start() / _async_run_loop(): Applies filters to the GetWorkItemsRequest sent to the backend

Public API (durabletask/__init__.py)

  • Exports WorkItemFilters alongside existing ConcurrencyOptions and VersioningOptions

In-memory backend (durabletask/testing/in_memory_backend.py)

  • Parses WorkItemFilters from GetWorkItemsRequest via _parse_work_item_filters()
  • Filters orchestration, activity, and entity work items during dispatch
  • Uses a collect-and-requeue pattern to avoid infinite loops when items don't match

Documentation

  • docs/features.md: New "Work item filtering" section covering auto-generation, explicit filters, version constraints, and opt-in behavior
  • docs/supported-patterns.md: New "Work item filtering" pattern with usage examples

Example

  • examples/work_item_filtering.py: Full sample demonstrating both auto-generated and explicit filter usage with DurableTaskSchedulerWorker

Tests

  • tests/durabletask/test_work_item_filters.py — 27 unit tests covering filter classes, _from_registry(), _to_grpc(), and use_work_item_filters() behavior
  • tests/durabletask/test_work_item_filters_e2e.py — 7 in-memory E2E tests: auto filters, explicit filters, no filters, cleared filters, entity filters, and non-matching filter scenarios
  • tests/durabletask-azuremanaged/test_dts_work_item_filters_e2e.py — 7 DTS E2E tests mirroring the in-memory tests against the Durable Task Scheduler emulator

Usage

Auto-generated filters (recommended)

worker = TaskHubGrpcWorker()
worker.add_orchestrator(my_orchestrator)
worker.add_activity(my_activity)
worker.use_work_item_filters()  # derives filters from registered tasks
worker.start()

Explicit filters

from durabletask.worker import (
    WorkItemFilters,
    OrchestrationWorkItemFilter,
    ActivityWorkItemFilter,
)

worker = TaskHubGrpcWorker()
worker.add_orchestrator(my_orchestrator)
worker.add_activity(my_activity)
worker.use_work_item_filters(WorkItemFilters(
    orchestrations=[
        OrchestrationWorkItemFilter(name="my_orchestrator", versions=["v1"]),
    ],
    activities=[
        ActivityWorkItemFilter(name="my_activity"),
    ],
))
worker.start()

@andystaples andystaples requested review from berndverst and Copilot and removed request for Copilot March 27, 2026 18:12
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in “work item filtering” capability to the Durable Task Python SDK so workers can declare which orchestrations/activities/entities they handle, and sends those filters to the backend via GetWorkItemsRequest.workItemFilters for server-side dispatch filtering.

Changes:

  • Introduces filter model types (WorkItemFilters + per-kind filter classes) and a new worker API TaskHubGrpcWorker.use_work_item_filters() that can auto-generate filters from the registry or accept explicit filters.
  • Implements filter-aware dispatch in the in-memory backend and adds extensive unit + E2E coverage (in-memory + DTS).
  • Adds documentation and a runnable example demonstrating both auto-generated and explicit filters.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
durabletask/worker.py Adds filter classes, auto-generation logic, gRPC conversion, and worker API to enable/supply/clear filters.
durabletask/testing/in_memory_backend.py Parses workItemFilters from GetWorkItemsRequest and applies filtering during dispatch.
durabletask/init.py Exports WorkItemFilters from the top-level package.
docs/features.md Documents opt-in behavior, auto-generation, explicit filters, and clearing filters.
docs/supported-patterns.md Adds a “Work item filtering” supported pattern with usage examples.
examples/work_item_filtering.py Adds an end-to-end example showing auto-generated and explicit filters with DTS.
tests/durabletask/test_work_item_filters.py Unit tests for filter classes, registry auto-generation, grpc conversion, and worker API behavior.
tests/durabletask/test_work_item_filters_e2e.py In-memory E2E tests verifying matching, non-matching, cleared, and entity filtering behavior.
tests/durabletask-azuremanaged/test_dts_work_item_filters_e2e.py DTS E2E tests mirroring the in-memory E2E scenarios.
.vscode/mcp.json Adds VS Code MCP server configuration (appears unrelated to filtering feature).

Copy link
Copy Markdown
Member

@berndverst berndverst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good implementation that faithfully ports the .NET work item filtering design. The filter types, auto-generation logic, version handling, and opt-in pattern all align correctly with the dotnet implementation. Tests are comprehensive (27 unit + 7 in-memory E2E + 7 DTS E2E), and the version-aware filtering tests go beyond what the .NET test suite covers.

A few inline suggestions — nothing blocking.

Note: The PR has a merge conflict in durabletask/__init__.pymain now includes LargePayloadStorageOptions and PayloadStore exports that aren't in the PR's base. Please rebase.

Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.

…tem-filtering

# Conflicts:
#	docs/features.md
#	docs/supported-patterns.md
#	durabletask/__init__.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants