diff --git a/src/Client/Core/PurgeInstancesFilter.cs b/src/Client/Core/PurgeInstancesFilter.cs index 04c7f9c3f..4c20e2cde 100644 --- a/src/Client/Core/PurgeInstancesFilter.cs +++ b/src/Client/Core/PurgeInstancesFilter.cs @@ -14,4 +14,16 @@ public record PurgeInstancesFilter( DateTimeOffset? CreatedTo = null, IEnumerable? Statuses = null) { + /// + /// Gets or sets the maximum amount of time to spend purging instances in a single call. + /// If null (default), all matching instances are purged with no time limit. + /// When set, the purge operation stops deleting additional instances after this duration elapses + /// and returns a partial result. Callers can check and + /// re-invoke the purge to continue where it left off. + /// The value of depends on the backend implementation: + /// it may be false if the purge timed out, true if all instances were purged, + /// or null if the backend does not support reporting completion status. + /// Not all backends support this property; those that do not will ignore it. + /// + public TimeSpan? Timeout { get; init; } } diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index eba7b9bc0..46e4dd2ed 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -488,6 +488,19 @@ public override Task PurgeAllInstancesAsync( request.PurgeInstanceFilter.RuntimeStatus.AddRange(filter.Statuses.Select(x => x.ToGrpcStatus())); } + if (filter?.Timeout is not null) + { + if (filter.Timeout.Value <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(filter), + filter.Timeout.Value, + "PurgeInstancesFilter.Timeout must be a positive TimeSpan."); + } + + request.PurgeInstanceFilter.Timeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(filter.Timeout.Value); + } + return this.PurgeInstancesCoreAsync(request, cancellation); } diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 0c34d986d..3d7c8eb49 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -25,6 +25,7 @@ message ActivityRequest { OrchestrationInstance orchestrationInstance = 4; int32 taskId = 5; TraceContext parentTraceContext = 6; + map tags = 7; } message ActivityResponse { @@ -320,6 +321,10 @@ message SendEntityMessageAction { } } +message RewindOrchestrationAction { + repeated HistoryEvent newHistory = 1; +} + message OrchestratorAction { int32 id = 1; oneof orchestratorActionType { @@ -330,6 +335,7 @@ message OrchestratorAction { CompleteOrchestrationAction completeOrchestration = 6; TerminateOrchestrationAction terminateOrchestration = 7; SendEntityMessageAction sendEntityMessage = 8; + RewindOrchestrationAction rewindOrchestration = 9; } } @@ -517,6 +523,7 @@ message PurgeInstanceFilter { google.protobuf.Timestamp createdTimeFrom = 1; google.protobuf.Timestamp createdTimeTo = 2; repeated OrchestrationStatus runtimeStatus = 3; + google.protobuf.Duration timeout = 4; } message PurgeInstancesResponse { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 743f3f8bd..b781a3903 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2026-02-24 00:01:28 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/1caadbd7ecfdf5f2309acbeac28a3e36d16aa156/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2026-04-06 16:10:08 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/bcf5af6a22caa70601bfc909918ba5937484279f/protos/orchestrator_service.proto diff --git a/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs b/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs index 8d098106c..f3da85ebf 100644 --- a/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs +++ b/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs @@ -127,5 +127,44 @@ public async Task ScheduleNewOrchestrationInstanceAsync_ValidDedupeStatus_DoesNo var exception = await act.Should().ThrowAsync(); exception.Which.Should().NotBeOfType(); } + + [Fact] + public async Task PurgeAllInstancesAsync_NegativeTimeout_ThrowsArgumentOutOfRangeException() + { + // Arrange + var client = this.CreateClient(); + var filter = new PurgeInstancesFilter { Timeout = TimeSpan.FromSeconds(-1) }; + + // Act & Assert + Func act = async () => await client.PurgeAllInstancesAsync(filter); + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Timeout must be a positive TimeSpan."); + } + + [Fact] + public async Task PurgeAllInstancesAsync_ZeroTimeout_ThrowsArgumentOutOfRangeException() + { + // Arrange + var client = this.CreateClient(); + var filter = new PurgeInstancesFilter { Timeout = TimeSpan.Zero }; + + // Act & Assert + Func act = async () => await client.PurgeAllInstancesAsync(filter); + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Timeout must be a positive TimeSpan."); + } + + [Fact] + public async Task PurgeAllInstancesAsync_PositiveTimeout_DoesNotThrowValidationError() + { + // Arrange + var client = this.CreateClient(); + var filter = new PurgeInstancesFilter { Timeout = TimeSpan.FromSeconds(30) }; + + // Act & Assert - validation should pass; the call will fail at gRPC level, not validation + Func act = async () => await client.PurgeAllInstancesAsync(filter); + var exception = await act.Should().ThrowAsync(); + exception.Which.Should().NotBeOfType(); + } }