diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..a19ade07 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +CHANGELOG.md merge=union diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..0051119d --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,31 @@ +# Changelog + +## [Unreleased] + +### Breaking Changes + +#### `Activity::Info` workflow fields are now nullable + +With the introduction of Standalone Activities (see below), an activity is no longer guaranteed to +have been scheduled by a workflow. `Activity::Info#workflow_id`, `#workflow_run_id`, +`#workflow_type`, and `#workflow_namespace` are now nullable — they return `nil` when the activity +was started via `Client#start_activity` rather than from a workflow. A new `Activity::Info#namespace` +accessor is always set (falling back to the client's namespace for standalone activities) and is +the recommended replacement for the deprecated `#workflow_namespace`. + +Existing workflow-only code paths are unaffected at runtime. The recommended migration is to call +`Activity::Info#in_workflow?` and branch on the result. + +### Added + +#### Standalone Activities + +Activities can now be started directly from a client, independently of any workflow. `Client#start_activity` +and `Client#execute_activity` schedule a standalone activity execution by ID and task queue, accepting the +same `Activity::Definition` classes (or by-name strings/symbols) used in workflow-scheduled activities. +`Client::ActivityHandle` provides `#result`, `#describe`, `#cancel`, and `#terminate`; `Client#list_activities` +and `Client#count_activities` provide visibility-backed queries; and `Client#async_activity_handle` now +accepts a standalone-form `ActivityIDReference` (constructed via `ActivityIDReference.for_standalone`) for +async completion. + +See https://docs.temporal.io/standalone-activity for the cross-SDK feature overview. \ No newline at end of file diff --git a/README.md b/README.md index 4962b304..28c716d9 100644 --- a/README.md +++ b/README.md @@ -1092,6 +1092,63 @@ it will raise the error raised in the activity. The constructor of the environment has multiple keyword arguments that can be set to affect the activity context for the activity. +#### Standalone Activities + +Activities can be started directly from a client, outside the context of any workflow. Standalone activities reuse the +existing `Activity::Definition` class — the same activity code runs whether invoked from a workflow or as a standalone +activity. They are addressed by `activity_id` (with an optional `activity_run_id` for disambiguating re-runs). + +Start a standalone activity: + +```ruby +handle = client.start_activity( + MyActivity, + 'some-arg', + id: 'my-activity-id', + task_queue: 'my-task-queue', + start_to_close_timeout: 60 +) +result = handle.result # blocks until the activity completes +``` + +Or use the execute helper to start and wait: + +```ruby +result = client.execute_activity( + MyActivity, 'some-arg', + id: 'my-activity-id', task_queue: 'my-task-queue', start_to_close_timeout: 60 +) +``` + +Get a handle to an existing standalone activity to describe, cancel, terminate, or fetch its result: + +```ruby +handle = client.activity_handle('my-activity-id') +description = handle.describe # ActivityExecution::Description +result = handle.result # blocks until the activity reaches a terminal state +handle.cancel('reason for cancel') # or +handle.terminate('reason for terminate') +``` + +List and count standalone activities (visibility queries): + +```ruby +client.list_activities('ActivityType="MyActivity"').each { |exec| puts exec.activity_id } +count = client.count_activities('ActivityType="MyActivity"').count +``` + +Inside an activity body, `Temporalio::Activity::Context.current.info` exposes whether the activity is standalone +or workflow-scheduled: + +```ruby +info = Temporalio::Activity::Context.current.info +if info.in_workflow? + # info.workflow_id, info.workflow_run_id, info.workflow_type are set +else + # info.activity_run_id is set; workflow_* fields are nil +end +``` + ### Telemetry #### Metrics diff --git a/temporalio/lib/temporalio/activity/definition.rb b/temporalio/lib/temporalio/activity/definition.rb index 413545d2..3e87178a 100644 --- a/temporalio/lib/temporalio/activity/definition.rb +++ b/temporalio/lib/temporalio/activity/definition.rb @@ -155,6 +155,28 @@ class Info # @return [Object, nil] Result hint attr_reader :result_hint + # Resolve an activity argument into a `[name, arg_hints, result_hint]` triple. Used by + # `Client#start_activity` to accept any of: a `Definition` subclass, an instance of one, + # an `Info`, a `Symbol` activity name, or a `String` activity name. Class/instance/Info + # inputs carry their definition's hints; Symbol/String inputs return `nil` hints. + # + # @param activity [Class, Definition, Info, Symbol, String] Activity argument. + # @return [Array(String, Array[Object]?, Object?)] name, arg_hints, result_hint. + def self._type_and_hints_from_parameter(activity) + case activity + when String, Symbol + [activity.to_s, nil, nil] + when Class, Definition, Info + # Return or construct an Info -- needed because we want the checks in Info.initialize. + info = from_activity(activity) + raise ArgumentError, 'Cannot pass dynamic activity to start_activity' unless info.name + + [info.name.to_s, info.arg_hints, info.result_hint] + else + raise ArgumentError, "#{activity} is not an activity class, instance, info, symbol, or string" + end + end + # Obtain definition info representing the given activity, which can be a class, instance, or definition info. # # @param activity [Definition, Class, Info] Activity to get info for. diff --git a/temporalio/lib/temporalio/activity/info.rb b/temporalio/lib/temporalio/activity/info.rb index f5332d42..9f542610 100644 --- a/temporalio/lib/temporalio/activity/info.rb +++ b/temporalio/lib/temporalio/activity/info.rb @@ -7,11 +7,13 @@ module Temporalio module Activity Info = Data.define( :activity_id, + :activity_run_id, :activity_type, :attempt, :current_attempt_scheduled_time, :heartbeat_timeout, :local?, + :namespace, :priority, :retry_policy, :raw_heartbeat_details, @@ -31,16 +33,20 @@ module Activity # # @!attribute activity_id # @return [String] ID for the activity. + # @!attribute activity_run_id + # @return [String, nil] Run ID for a standalone activity execution. nil for activities scheduled from a workflow. # @!attribute activity_type # @return [String] Type name for the activity. # @!attribute attempt - # @return [Integer] Attempt the activity is on. + # @return [Integer] Attempt the activity is on. Attempts start at 1 and increment on each retry. # @!attribute current_attempt_scheduled_time # @return [Time] When the current attempt was scheduled. # @!attribute heartbeat_timeout # @return [Float, nil] Heartbeat timeout set by the caller. # @!attribute local? # @return [Boolean] Whether the activity is a local activity or not. + # @!attribute namespace + # @return [String] Namespace this activity is on. # @!attribute priority # @return [Priority] The priority of this activity. # @!attribute retry_policy @@ -64,17 +70,24 @@ module Activity # @return [String] Task token uniquely identifying this activity. Note, this is a `ASCII-8BIT` encoded string, not # a `UTF-8` encoded string nor a valid UTF-8 string. # @!attribute workflow_id - # @return [String] Workflow ID that started this activity. + # @return [String, nil] Workflow ID that started this activity. nil for standalone activities. # @!attribute workflow_namespace - # @return [String] Namespace this activity is on. + # @return [String, nil] Namespace of the workflow that scheduled this activity. Nil for standalone + # activities. Prefer {#namespace}, which is always set. + # @deprecated Use {#namespace} instead. # @!attribute workflow_run_id - # @return [String] Workflow run ID that started this activity. + # @return [String, nil] Workflow run ID that started this activity. nil for standalone activities. # @!attribute workflow_type - # @return [String] Workflow type name that started this activity. + # @return [String, nil] Workflow type name that started this activity. nil for standalone activities. # # @note WARNING: This class may have required parameters added to its constructor. Users should not instantiate this # class or it may break in incompatible ways. class Info + # @return [Boolean] True if this activity was scheduled by a workflow execution; false for standalone activities. + def in_workflow? + !workflow_id.nil? + end + # Convert raw heartbeat details into Ruby types. # # Note, this live-converts every invocation. diff --git a/temporalio/lib/temporalio/client.rb b/temporalio/lib/temporalio/client.rb index 8e2d42d8..d6ad6e2a 100644 --- a/temporalio/lib/temporalio/client.rb +++ b/temporalio/lib/temporalio/client.rb @@ -3,6 +3,7 @@ require 'google/protobuf/well_known_types' require 'logger' require 'temporalio/api' +require 'temporalio/client/activity_handle' require 'temporalio/client/async_activity_handle' require 'temporalio/client/connection' require 'temporalio/client/interceptor' @@ -478,6 +479,189 @@ def workflow_handle( ) end + # Get a handle for an existing standalone activity. Useful when the activity was started elsewhere + # (a different process, or by another client) and you have only its ID. + # + # @param activity_id [String] ID for the activity. + # @param activity_run_id [String, nil] Run ID for the activity execution. If nil, operations target the + # latest run of the given activity ID. + # @param result_hint [Object, nil] Converter hint for the activity's result. Set this when you know what + # type the activity returns so {ActivityHandle#result}'s deserialization uses the right hint. + # + # @return [ActivityHandle] The activity handle. + def activity_handle(activity_id, activity_run_id: nil, result_hint: nil) + ActivityHandle.new(client: self, id: activity_id, run_id: activity_run_id, result_hint:) + end + + # Start a standalone activity execution and return its handle. + # + # @param activity [Class, Activity::Definition, Activity::Definition::Info, Symbol, String] + # Activity definition, definition class or activity name. + # @param args [Array] Arguments to the activity. + # @param id [String] Unique identifier for the activity execution. + # @param task_queue [String] Task queue to run the activity on. + # @param schedule_to_close_timeout [Float, nil] Schedule-to-close timeout in seconds. Either this or + # `start_to_close_timeout` must be specified. + # @param schedule_to_start_timeout [Float, nil] Schedule-to-start timeout in seconds. + # @param start_to_close_timeout [Float, nil] Start-to-close timeout in seconds. Either this or + # `schedule_to_close_timeout` must be specified. + # @param heartbeat_timeout [Float, nil] Heartbeat timeout in seconds. + # @param id_reuse_policy [ActivityIDReusePolicy] Controls behavior when an activity with the same ID + # was previously run and has reached a terminal state. Defaults to `ALLOW_DUPLICATE`. + # @param id_conflict_policy [ActivityIDConflictPolicy] Controls behavior when an activity with the same ID + # is currently running. Defaults to `FAIL` (reject the start attempt). + # @param retry_policy [RetryPolicy, nil] Retry policy for the activity. + # @param search_attributes [SearchAttributes, nil] Search attributes for the activity. + # @param static_summary [String, nil] Fixed single-line summary for this activity execution. + # @param static_details [String, nil] Fixed details for this activity execution. May be in markdown format. + # @param priority [Priority] Priority for the activity. + # @param arg_hints [Array, nil] Argument hints. + # @param result_hint [Object, nil] Result hint. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [ActivityHandle] Handle to the started activity. + # @raise [Error::ActivityAlreadyStartedError] Activity already exists with this ID. + # @raise [Error::RPCError] RPC error from call. + def start_activity( + activity, + *args, + id:, + task_queue:, + schedule_to_close_timeout: nil, + schedule_to_start_timeout: nil, + start_to_close_timeout: nil, + heartbeat_timeout: nil, + id_reuse_policy: ActivityIDReusePolicy::ALLOW_DUPLICATE, + id_conflict_policy: ActivityIDConflictPolicy::FAIL, + retry_policy: nil, + search_attributes: nil, + static_summary: nil, + static_details: nil, + priority: Priority.default, + arg_hints: nil, + result_hint: nil, + rpc_options: nil + ) + activity_name, defn_arg_hints, defn_result_hint = + Activity::Definition::Info._type_and_hints_from_parameter(activity) + @impl.start_activity(Interceptor::StartActivityInput.new( + activity: activity_name, + args:, + activity_id: id, + task_queue:, + schedule_to_close_timeout:, + schedule_to_start_timeout:, + start_to_close_timeout:, + heartbeat_timeout:, + id_reuse_policy:, + id_conflict_policy:, + retry_policy:, + search_attributes:, + static_summary:, + static_details:, + headers: {}, + priority:, + arg_hints: arg_hints || defn_arg_hints, + result_hint: result_hint || defn_result_hint, + rpc_options: + )) + end + + # Start a standalone activity execution and wait for its result. Shortcut for + # {start_activity} + {ActivityHandle#result}. + # + # @param activity [Class, Activity::Definition, Activity::Definition::Info, Symbol, String] + # Activity definition, definition class or activity name. + # @param args [Array] Arguments to the activity. + # @param id [String] Unique identifier for the activity execution. + # @param task_queue [String] Task queue to run the activity on. + # @param schedule_to_close_timeout [Float, nil] Schedule-to-close timeout in seconds. Either this or + # `start_to_close_timeout` must be specified. + # @param schedule_to_start_timeout [Float, nil] Schedule-to-start timeout in seconds. + # @param start_to_close_timeout [Float, nil] Start-to-close timeout in seconds. Either this or + # `schedule_to_close_timeout` must be specified. + # @param heartbeat_timeout [Float, nil] Heartbeat timeout in seconds. + # @param id_reuse_policy [ActivityIDReusePolicy] Controls behavior when an activity with the same ID + # was previously run and has reached a terminal state. Defaults to `ALLOW_DUPLICATE`. + # @param id_conflict_policy [ActivityIDConflictPolicy] Controls behavior when an activity with the same ID + # is currently running. Defaults to `FAIL` (reject the start attempt). + # @param retry_policy [RetryPolicy, nil] Retry policy for the activity. + # @param search_attributes [SearchAttributes, nil] Search attributes for the activity. + # @param static_summary [String, nil] Fixed single-line summary for this activity execution. + # @param static_details [String, nil] Fixed details for this activity execution. May be in markdown format. + # @param priority [Priority] Priority for the activity. + # @param arg_hints [Array, nil] Argument hints. + # @param result_hint [Object, nil] Result hint. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [Object, nil] Successful result of the activity. + # @raise [Error::ActivityAlreadyStartedError] Activity already exists with this ID. + # @raise [Error::ActivityFailedError] With `cause` populated from the activity failure. + # @raise [Error::RPCError] RPC error from call. + def execute_activity( + activity, + *args, + id:, + task_queue:, + schedule_to_close_timeout: nil, + schedule_to_start_timeout: nil, + start_to_close_timeout: nil, + heartbeat_timeout: nil, + id_reuse_policy: ActivityIDReusePolicy::ALLOW_DUPLICATE, + id_conflict_policy: ActivityIDConflictPolicy::FAIL, + retry_policy: nil, + search_attributes: nil, + static_summary: nil, + static_details: nil, + priority: Priority.default, + arg_hints: nil, + result_hint: nil, + rpc_options: nil + ) + start_activity( + activity, + *args, + id:, + task_queue:, + schedule_to_close_timeout:, + schedule_to_start_timeout:, + start_to_close_timeout:, + heartbeat_timeout:, + id_reuse_policy:, + id_conflict_policy:, + retry_policy:, + search_attributes:, + static_summary:, + static_details:, + priority:, + arg_hints:, + result_hint:, + rpc_options: + ).result + end + + # List standalone activities matching a visibility query. + # + # @param query [String] Visibility list filter. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [Enumerator] Lazy enumerable of matching activity executions. + # @raise [Error::RPCError] RPC error from call. + def list_activities(query, rpc_options: nil) + @impl.list_activities(Interceptor::ListActivitiesInput.new(query:, rpc_options:)) + end + + # Count standalone activities matching a visibility query. + # + # @param query [String] Visibility list filter. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [ActivityExecutionCount] Count of activities (with per-group counts if the query had a group-by clause). + # @raise [Error::RPCError] RPC error from call. + def count_activities(query, rpc_options: nil) + @impl.count_activities(Interceptor::CountActivitiesInput.new(query:, rpc_options:)) + end + # Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict # policy). Note that in some cases this may fail but the workflow will still be started, and the handle can then be # retrieved on the start workflow operation. diff --git a/temporalio/lib/temporalio/client/activity_execution.rb b/temporalio/lib/temporalio/client/activity_execution.rb new file mode 100644 index 00000000..d2a44ec4 --- /dev/null +++ b/temporalio/lib/temporalio/client/activity_execution.rb @@ -0,0 +1,217 @@ +# frozen_string_literal: true + +require 'temporalio/api' +require 'temporalio/client/activity_execution_status' +require 'temporalio/client/pending_activity_state' +require 'temporalio/internal/proto_utils' +require 'temporalio/priority' +require 'temporalio/retry_policy' +require 'temporalio/search_attributes' +require 'temporalio/worker_deployment_version' + +module Temporalio + class Client + # Info for a standalone activity execution. Returned by list_activities; extended by {Description} + # for describe results. + class ActivityExecution + # @return [Api::Activity::V1::ActivityExecutionListInfo, Api::Activity::V1::ActivityExecutionInfo] + # Underlying protobuf info. + attr_reader :raw_info + + # @!visibility private + def initialize(raw_info) + @raw_info = raw_info + @search_attributes = Internal::ProtoUtils::LazySearchAttributes.new(raw_info.search_attributes) + end + + # @return [String] ID for the activity. + def activity_id + @raw_info.activity_id + end + + # @return [String] Run ID for this activity execution attempt. + def activity_run_id + Internal::ProtoUtils.string_or(@raw_info.run_id, nil) + end + + # @return [String] Type name of the activity. + def activity_type + @raw_info.activity_type&.name + end + + # @return [Time, nil] When the activity was scheduled. + def schedule_time + Internal::ProtoUtils.timestamp_to_time(@raw_info.schedule_time) + end + + # @return [Time, nil] When the activity reached a terminal state. + def close_time + Internal::ProtoUtils.timestamp_to_time(@raw_info.close_time) + end + + # @return [ActivityExecutionStatus] Overall status for the activity. + def status + Internal::ProtoUtils.enum_to_int(Api::Enums::V1::ActivityExecutionStatus, @raw_info.status) + end + + # @return [SearchAttributes, nil] Search attributes attached to this activity if any. + def search_attributes + @search_attributes.get + end + + # @return [String] Task queue for the activity. + def task_queue + @raw_info.task_queue + end + + # @return [Float, nil] How long this activity has been running across all attempts, in seconds. + def execution_duration + Internal::ProtoUtils.duration_to_seconds(@raw_info.execution_duration) + end + + # Rich description of a standalone activity execution; returned by {ActivityHandle#describe}. + class Description < ActivityExecution + # @return [Api::WorkflowService::V1::DescribeActivityExecutionResponse] Underlying protobuf response. + attr_reader :raw_description + + # @!visibility private + def initialize(raw_description, data_converter) + super(raw_description.info) + @raw_description = raw_description + @data_converter = data_converter + end + + # @return [PendingActivityState, nil] More detailed breakdown of the running state when + # the activity's status is RUNNING; nil otherwise. + def run_state + Internal::ProtoUtils.enum_to_int(Api::Enums::V1::PendingActivityState, @raw_info.run_state, + zero_means_nil: true) + end + + # @return [Float, nil] Schedule-to-close timeout in seconds. + def schedule_to_close_timeout + Internal::ProtoUtils.duration_to_seconds(@raw_info.schedule_to_close_timeout) + end + + # @return [Float, nil] Schedule-to-start timeout in seconds. + def schedule_to_start_timeout + Internal::ProtoUtils.duration_to_seconds(@raw_info.schedule_to_start_timeout) + end + + # @return [Float, nil] Start-to-close timeout in seconds. + def start_to_close_timeout + Internal::ProtoUtils.duration_to_seconds(@raw_info.start_to_close_timeout) + end + + # @return [Float, nil] Heartbeat timeout in seconds. + def heartbeat_timeout + Internal::ProtoUtils.duration_to_seconds(@raw_info.heartbeat_timeout) + end + + # @return [Boolean] Whether the activity has recorded any heartbeat details. + def has_heartbeat_details? # rubocop:disable Naming/PredicatePrefix + !@raw_info.heartbeat_details&.payloads.nil? && !@raw_info.heartbeat_details.payloads.empty? + end + + # Deserialized last-heartbeat details. Empty when no heartbeat has been recorded. + # + # @param hints [Array, nil] Hints, if any, to assist conversion. + # @return [Array] Converted details. + def heartbeat_details(hints: nil) + @data_converter.from_payloads(@raw_info.heartbeat_details, hints:) + end + + # @return [RetryPolicy] Retry policy in effect for this activity. + def retry_policy + RetryPolicy._from_proto(@raw_info.retry_policy) + end + + # @return [Time, nil] Time the last heartbeat was recorded. + def last_heartbeat_time + Internal::ProtoUtils.timestamp_to_time(@raw_info.last_heartbeat_time) + end + + # @return [Time, nil] Time the last attempt started. + def last_started_time + Internal::ProtoUtils.timestamp_to_time(@raw_info.last_started_time) + end + + # @return [Integer] Current attempt number. Attempts start at 1 and increment on each retry. + def attempt + @raw_info.attempt + end + + # @return [Error::Failure, nil] Failure of the last failed attempt if any. + def last_failure + return nil unless @raw_info.last_failure + + @data_converter.from_failure(@raw_info.last_failure) + end + + # @return [Time, nil] Schedule time + schedule-to-close timeout. + def expiration_time + Internal::ProtoUtils.timestamp_to_time(@raw_info.expiration_time) + end + + # @return [String, nil] Identity of the worker that last picked up this activity. + def last_worker_identity + Internal::ProtoUtils.string_or(@raw_info.last_worker_identity, nil) + end + + # @return [Float, nil] Time from last attempt failure to next retry, in seconds. + def current_retry_interval + Internal::ProtoUtils.duration_to_seconds(@raw_info.current_retry_interval) + end + + # @return [Time, nil] Time when the last attempt completed. + def last_attempt_complete_time + Internal::ProtoUtils.timestamp_to_time(@raw_info.last_attempt_complete_time) + end + + # @return [Time, nil] Time when the next attempt will be scheduled. + def next_attempt_schedule_time + Internal::ProtoUtils.timestamp_to_time(@raw_info.next_attempt_schedule_time) + end + + # @return [WorkerDeploymentVersion, nil] Worker deployment version this activity was last dispatched to. + def last_deployment_version + raw = @raw_info.last_deployment_version + return nil unless raw + + WorkerDeploymentVersion.new( + deployment_name: raw.deployment_name, + build_id: raw.build_id + ) + end + + # @return [Priority] Priority of this activity. + def priority + Priority._from_proto(@raw_info.priority) + end + + # @return [String, nil] Reason given when cancellation was requested. + def canceled_reason + Internal::ProtoUtils.string_or(@raw_info.canceled_reason, nil) + end + + # @return [String, nil] Static user-metadata summary on the activity. + def static_summary + user_metadata.first + end + + # @return [String, nil] Static user-metadata details on the activity. May be in markdown format. + def static_details + user_metadata.last + end + + private + + def user_metadata + @user_metadata ||= Internal::ProtoUtils.from_user_metadata( + @raw_info.user_metadata, @data_converter + ) + end + end + end + end +end diff --git a/temporalio/lib/temporalio/client/activity_execution_count.rb b/temporalio/lib/temporalio/client/activity_execution_count.rb new file mode 100644 index 00000000..7cf75705 --- /dev/null +++ b/temporalio/lib/temporalio/client/activity_execution_count.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module Temporalio + class Client + # Result of a {Client#count_activities} call. + class ActivityExecutionCount + # @return [Integer] Approximate number of activities matching the query. If the query had a group-by clause, + # this is the sum of all the counts in {groups}. + attr_reader :count + + # @return [Array] Groups if the query had a group-by clause, or empty if not. + attr_reader :groups + + # @!visibility private + def initialize(count, groups) + @count = count + @groups = groups + end + + # Aggregation group if the activity count query had a group-by clause. + class AggregationGroup + # @return [Integer] Approximate number of activities matching the query for this group. + attr_reader :count + + # @return [Array] Search attribute values for this group. + attr_reader :group_values + + # @!visibility private + def initialize(count, group_values) + @count = count + @group_values = group_values + end + end + end + end +end diff --git a/temporalio/lib/temporalio/client/activity_execution_status.rb b/temporalio/lib/temporalio/client/activity_execution_status.rb new file mode 100644 index 00000000..7ac921b7 --- /dev/null +++ b/temporalio/lib/temporalio/client/activity_execution_status.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +require 'temporalio/api' + +module Temporalio + class Client + # Status of a standalone activity execution. + module ActivityExecutionStatus + UNSPECIFIED = Api::Enums::V1::ActivityExecutionStatus::ACTIVITY_EXECUTION_STATUS_UNSPECIFIED + RUNNING = Api::Enums::V1::ActivityExecutionStatus::ACTIVITY_EXECUTION_STATUS_RUNNING + COMPLETED = Api::Enums::V1::ActivityExecutionStatus::ACTIVITY_EXECUTION_STATUS_COMPLETED + FAILED = Api::Enums::V1::ActivityExecutionStatus::ACTIVITY_EXECUTION_STATUS_FAILED + CANCELED = Api::Enums::V1::ActivityExecutionStatus::ACTIVITY_EXECUTION_STATUS_CANCELED + TERMINATED = Api::Enums::V1::ActivityExecutionStatus::ACTIVITY_EXECUTION_STATUS_TERMINATED + TIMED_OUT = Api::Enums::V1::ActivityExecutionStatus::ACTIVITY_EXECUTION_STATUS_TIMED_OUT + end + end +end diff --git a/temporalio/lib/temporalio/client/activity_handle.rb b/temporalio/lib/temporalio/client/activity_handle.rb new file mode 100644 index 00000000..ab4aa07c --- /dev/null +++ b/temporalio/lib/temporalio/client/activity_handle.rb @@ -0,0 +1,121 @@ +# frozen_string_literal: true + +require 'temporalio/api' +require 'temporalio/client/activity_execution' +require 'temporalio/client/interceptor' +require 'temporalio/error' + +module Temporalio + class Client + # Handle for interacting with a standalone activity. Usually created via {Client.activity_handle} + # or {Client#start_activity}. + class ActivityHandle + # @return [String] ID for the activity. + attr_reader :id + + # @return [String, nil] Run ID for this activity execution. When nil, this handle targets the latest run. + attr_reader :run_id + + # @return [Object, nil] Result hint used when deserializing the activity's result. May be overridden per + # {#result} call. + attr_reader :result_hint + + # @!visibility private + def initialize(client:, id:, run_id:, result_hint:) + @client = client + @id = id + @run_id = run_id + @result_hint = result_hint + end + + # Wait for the activity's outcome (result or failure). Internally long-polls + # PollActivityExecution and reissues until the activity reaches a terminal state, so this can + # block indefinitely for long-running activities. + # + # @param result_hint [Object, nil] Override the result hint. If nil, uses {#result_hint}. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [Object, nil] Deserialized activity result. + # + # @raise [Error::ActivityFailedError] With `cause` populated from the activity failure. + # @raise [Error::RPCError] RPC error from call. + def result(result_hint: nil, rpc_options: nil) + hint = result_hint || @result_hint + outcome = @client._impl.fetch_activity_outcome( + Interceptor::FetchActivityOutcomeInput.new( + activity_id: id, + activity_run_id: run_id, + rpc_options: + ) + ) + _process_outcome(outcome, hint) + end + + # Describe the activity. + # + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [ActivityExecution::Description] Activity description. + # @raise [Error::RPCError] RPC error from call. + def describe(rpc_options: nil) + @client._impl.describe_activity( + Interceptor::DescribeActivityInput.new( + activity_id: id, + activity_run_id: run_id, + rpc_options: + ) + ) + end + + # Request cancellation of the activity. + # + # @param reason [String, nil] Optional cancellation reason recorded on the server. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # @raise [Error::RPCError] RPC error from call. + def cancel(reason = nil, rpc_options: nil) + @client._impl.cancel_activity( + Interceptor::CancelActivityInput.new( + activity_id: id, + activity_run_id: run_id, + reason:, + rpc_options: + ) + ) + nil + end + + # Terminate the activity (force-close). + # + # @param reason [String, nil] Optional termination reason recorded on the activity's failure outcome. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # @raise [Error::RPCError] RPC error from call. + def terminate(reason = nil, rpc_options: nil) + @client._impl.terminate_activity( + Interceptor::TerminateActivityInput.new( + activity_id: id, + activity_run_id: run_id, + reason:, + rpc_options: + ) + ) + nil + end + + private + + def _process_outcome(outcome, hint) + raise Error, 'Activity completed but outcome is missing from server response' if outcome.nil? + + case outcome.value + when :failure + cause = @client.data_converter.from_failure(outcome.failure) + raise Error::ActivityFailedError.new, cause: cause + when :result + @client.data_converter.from_payloads(outcome.result, hints: Array(hint)).first + else + raise Error, "Unknown activity outcome: #{outcome.value.inspect}" + end + end + end + end +end diff --git a/temporalio/lib/temporalio/client/activity_id_reference.rb b/temporalio/lib/temporalio/client/activity_id_reference.rb index 23eaf97a..3bcc941e 100644 --- a/temporalio/lib/temporalio/client/activity_id_reference.rb +++ b/temporalio/lib/temporalio/client/activity_id_reference.rb @@ -6,26 +6,57 @@ module Temporalio class Client - # Reference to an existing activity by its workflow ID, run ID, and activity ID. + # Reference to an activity for use with {Client#async_activity_handle}. There are two shapes, + # depending on whether the activity is run in a workflow, or as a standalone activity. + # + # 1. Activity run in a workflow -- use {ActivityIDReference#initialize}: + # `ActivityIDReference.new(workflow_id:, run_id:, activity_id:)`. + # + # 2. Standalone Activity (started via {Client#start_activity}): use the class factory + # {ActivityIDReference.for_standalone}: `ActivityIDReference.for_standalone(activity_id:, activity_run_id:)`. class ActivityIDReference - # @return [String] ID for the workflow. + # @return [String] ID for the activity. + attr_reader :activity_id + + # @return [String, nil] Activity run ID. Set only for standalone activity references. + attr_reader :activity_run_id + + # @return [String, nil] ID for the workflow. Set only for workflow-run activity references. attr_reader :workflow_id - # @return [String, nil] Run ID for the workflow. + # @return [String, nil] Run ID for the workflow. Set only for workflow-run activity references. attr_reader :run_id - # @return [String] ID for the activity. - attr_reader :activity_id + # Construct a standalone activity reference. + # + # @param activity_id [String] ID for the activity. + # @param activity_run_id [String, nil] Run ID for the activity execution. nil targets the latest run. + # @return [ActivityIDReference] A reference suitable for {Client#async_activity_handle}. + def self.for_standalone(activity_id:, activity_run_id: nil) + allocate.tap do |ref| + ref.instance_variable_set(:@activity_id, activity_id) + ref.instance_variable_set(:@activity_run_id, activity_run_id) + ref.instance_variable_set(:@workflow_id, nil) + ref.instance_variable_set(:@run_id, nil) + end + end - # Create an activity ID reference. + # Construct a workflow-run activity reference. # # @param workflow_id [String] ID for the workflow. # @param run_id [String, nil] Run ID for the workflow. - # @param activity_id [String] ID for the workflow. + # @param activity_id [String] ID for the activity. + # @return [ActivityIDReference] A reference suitable for {Client#async_activity_handle}. def initialize(workflow_id:, run_id:, activity_id:) @workflow_id = workflow_id @run_id = run_id @activity_id = activity_id + @activity_run_id = nil + end + + # @return [Boolean] True if this reference is the standalone-form (activity_run_id without workflow_id). + def standalone? + @workflow_id.nil? end end end diff --git a/temporalio/lib/temporalio/client/interceptor.rb b/temporalio/lib/temporalio/client/interceptor.rb index 361acd1c..69ce7479 100644 --- a/temporalio/lib/temporalio/client/interceptor.rb +++ b/temporalio/lib/temporalio/client/interceptor.rb @@ -259,6 +259,72 @@ def intercept_client(next_interceptor) :rpc_options ) + # Input for {Outbound.start_activity}. + StartActivityInput = Data.define( + :activity, + :args, + :activity_id, + :task_queue, + :schedule_to_close_timeout, + :schedule_to_start_timeout, + :start_to_close_timeout, + :heartbeat_timeout, + :id_reuse_policy, + :id_conflict_policy, + :retry_policy, + :search_attributes, + :static_summary, + :static_details, + :headers, + :priority, + :arg_hints, + :result_hint, + :rpc_options + ) + + # Input for {Outbound.describe_activity}. + DescribeActivityInput = Data.define( + :activity_id, + :activity_run_id, + :rpc_options + ) + + # Input for {Outbound.cancel_activity}. + CancelActivityInput = Data.define( + :activity_id, + :activity_run_id, + :reason, + :rpc_options + ) + + # Input for {Outbound.terminate_activity}. + TerminateActivityInput = Data.define( + :activity_id, + :activity_run_id, + :reason, + :rpc_options + ) + + # Input for {Outbound.list_activities}. + ListActivitiesInput = Data.define( + :query, + :rpc_options + ) + + # Input for {Outbound.count_activities}. + CountActivitiesInput = Data.define( + :query, + :rpc_options + ) + + # Input for {Outbound.fetch_activity_outcome}. Used by `ActivityHandle#result` for long-polling + # the activity outcome via `PollActivityExecution`. + FetchActivityOutcomeInput = Data.define( + :activity_id, + :activity_run_id, + :rpc_options + ) + # Outbound interceptor for intercepting client calls. This should be extended by users needing to intercept client # actions. class Outbound @@ -467,6 +533,60 @@ def fail_async_activity(input) def report_cancellation_async_activity(input) next_interceptor.report_cancellation_async_activity(input) end + + # Called for every {Client.start_activity} and {Client.execute_activity} call. + # + # @param input [StartActivityInput] Input. + # @return [ActivityHandle] Activity handle. + def start_activity(input) + next_interceptor.start_activity(input) + end + + # Called for every {ActivityHandle.describe} call. + # + # @param input [DescribeActivityInput] Input. + # @return [ActivityExecution::Description] Activity description. + def describe_activity(input) + next_interceptor.describe_activity(input) + end + + # Called for every {ActivityHandle.cancel} call. + # + # @param input [CancelActivityInput] Input. + def cancel_activity(input) + next_interceptor.cancel_activity(input) + end + + # Called for every {ActivityHandle.terminate} call. + # + # @param input [TerminateActivityInput] Input. + def terminate_activity(input) + next_interceptor.terminate_activity(input) + end + + # Called for every {Client.list_activities} call. + # + # @param input [ListActivitiesInput] Input. + # @return [Enumerator] Activity executions. + def list_activities(input) + next_interceptor.list_activities(input) + end + + # Called for every {Client.count_activities} call. + # + # @param input [CountActivitiesInput] Input. + # @return [ActivityExecutionCount] Activity count. + def count_activities(input) + next_interceptor.count_activities(input) + end + + # Called by {ActivityHandle.result} to long-poll the activity outcome. + # + # @param input [FetchActivityOutcomeInput] Input. + # @return [Api::Activity::V1::ActivityExecutionOutcome] Activity outcome. + def fetch_activity_outcome(input) + next_interceptor.fetch_activity_outcome(input) + end end end end diff --git a/temporalio/lib/temporalio/client/pending_activity_state.rb b/temporalio/lib/temporalio/client/pending_activity_state.rb new file mode 100644 index 00000000..b3d061b6 --- /dev/null +++ b/temporalio/lib/temporalio/client/pending_activity_state.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +require 'temporalio/api' + +module Temporalio + class Client + # More detailed breakdown of a running activity's state. + module PendingActivityState + SCHEDULED = Api::Enums::V1::PendingActivityState::PENDING_ACTIVITY_STATE_SCHEDULED + STARTED = Api::Enums::V1::PendingActivityState::PENDING_ACTIVITY_STATE_STARTED + CANCEL_REQUESTED = Api::Enums::V1::PendingActivityState::PENDING_ACTIVITY_STATE_CANCEL_REQUESTED + PAUSED = Api::Enums::V1::PendingActivityState::PENDING_ACTIVITY_STATE_PAUSED + PAUSE_REQUESTED = Api::Enums::V1::PendingActivityState::PENDING_ACTIVITY_STATE_PAUSE_REQUESTED + end + end +end diff --git a/temporalio/lib/temporalio/common_enums.rb b/temporalio/lib/temporalio/common_enums.rb index e6bdd6a2..ebc5df12 100644 --- a/temporalio/lib/temporalio/common_enums.rb +++ b/temporalio/lib/temporalio/common_enums.rb @@ -84,6 +84,30 @@ module SuggestContinueAsNewReason Api::Enums::V1::SuggestContinueAsNewReason::SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES end + # Controls behavior when an activity with the same ID was previously run and is now closed. + # + # @see https://docs.temporal.io/activities + module ActivityIDReusePolicy + # Always allow starting an activity using the same activity ID. + ALLOW_DUPLICATE = Api::Enums::V1::ActivityIdReusePolicy::ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE + # Allow starting an activity using the same ID only when the last activity execution was not successful. + ALLOW_DUPLICATE_FAILED_ONLY = + Api::Enums::V1::ActivityIdReusePolicy::ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY + # Do not permit re-use of the ID for this activity. Future start requests could potentially change the policy, + # allowing re-use of the ID. + REJECT_DUPLICATE = Api::Enums::V1::ActivityIdReusePolicy::ACTIVITY_ID_REUSE_POLICY_REJECT_DUPLICATE + end + + # Controls behavior when an activity with the same ID is currently running. + # + # @see https://docs.temporal.io/activities + module ActivityIDConflictPolicy + # Don't start a new activity; instead fail with already-started error. + FAIL = Api::Enums::V1::ActivityIdConflictPolicy::ACTIVITY_ID_CONFLICT_POLICY_FAIL + # Don't start a new activity; instead return a handle for the running activity. + USE_EXISTING = Api::Enums::V1::ActivityIdConflictPolicy::ACTIVITY_ID_CONFLICT_POLICY_USE_EXISTING + end + # Specifies when a workflow might move from a worker of one Build Id to another. module VersioningBehavior # Unspecified versioning behavior. By default, workers opting into worker versioning will diff --git a/temporalio/lib/temporalio/error.rb b/temporalio/lib/temporalio/error.rb index 0170990a..8d967864 100644 --- a/temporalio/lib/temporalio/error.rb +++ b/temporalio/lib/temporalio/error.rb @@ -40,6 +40,15 @@ def initialize(message = 'Workflow execution failed') end end + # Error returned from {Client::ActivityHandle#result} when the activity did not complete successfully. + # The specific activity failure can be accessed via `cause`. + class ActivityFailedError < Error + # @!visibility private + def initialize(message = 'Activity execution failed') + super + end + end + # Error that occurs when a workflow was continued as new. class WorkflowContinuedAsNewError < Error # @return [String] New execution run ID the workflow continued to. diff --git a/temporalio/lib/temporalio/error/failure.rb b/temporalio/lib/temporalio/error/failure.rb index 6edc5cd5..cd60e498 100644 --- a/temporalio/lib/temporalio/error/failure.rb +++ b/temporalio/lib/temporalio/error/failure.rb @@ -29,6 +29,26 @@ def initialize(workflow_id:, workflow_type:, run_id:) end end + # Error raised by a client when a standalone activity execution has already started. + class ActivityAlreadyStartedError < Failure + # @return [String] ID of the already-started activity. + attr_reader :activity_id + + # @return [String] Activity type name of the already-started activity. + attr_reader :activity_type + + # @return [String, nil] Run ID of the already-started activity if known. + attr_reader :activity_run_id + + # @!visibility private + def initialize(activity_id:, activity_type:, activity_run_id:) + super('Activity execution already started') + @activity_id = activity_id + @activity_type = activity_type + @activity_run_id = activity_run_id + end + end + # Error raised during workflow/activity execution. class ApplicationError < Failure # @return [Array] User-defined details on the error. diff --git a/temporalio/lib/temporalio/internal/client/implementation.rb b/temporalio/lib/temporalio/internal/client/implementation.rb index ac1e67ca..8dbf47f9 100644 --- a/temporalio/lib/temporalio/internal/client/implementation.rb +++ b/temporalio/lib/temporalio/internal/client/implementation.rb @@ -4,6 +4,9 @@ require 'securerandom' require 'temporalio/activity' require 'temporalio/api' +require 'temporalio/client/activity_execution' +require 'temporalio/client/activity_execution_count' +require 'temporalio/client/activity_handle' require 'temporalio/client/activity_id_reference' require 'temporalio/client/async_activity_handle' require 'temporalio/client/connection' @@ -27,6 +30,35 @@ module Temporalio module Internal module Client class Implementation < Temporalio::Client::Interceptor::Outbound + # Proto routing convention for standalone activity completion: `*_by_id` requests carry + # `resource_id = "activity:"`. See the resource_id field comment on + # `RecordActivityTaskHeartbeatByIdRequest` (and the analogous Completed/Failed/Canceled + # requests) in `workflowservice/v1/request_response.proto`. Workflow-scheduled activities + # leave resource_id empty. + STANDALONE_ACTIVITY_RESOURCE_ID_PREFIX = 'activity:' + + # Returns the activity_id / workflow_id / run_id / resource_id fields for a `*_by_id` + # async-completion request, depending on whether the reference is the standalone or + # workflow-bound form. Splat into the request constructor's kwargs alongside the + # request-specific fields. + def self._activity_id_reference_request_fields(ref) + if ref.standalone? + { + workflow_id: nil, + run_id: ref.activity_run_id, + activity_id: ref.activity_id, + resource_id: "#{STANDALONE_ACTIVITY_RESOURCE_ID_PREFIX}#{ref.activity_id}" + } + else + { + workflow_id: ref.workflow_id, + run_id: ref.run_id, + activity_id: ref.activity_id, + resource_id: nil + } + end + end + def self.with_default_rpc_options(user_rpc_options) # If the user did not provide an override_retry, we need to make sure # we use an option set that has it as "true" @@ -808,12 +840,11 @@ def update_schedule(input) end def heartbeat_async_activity(input) - resp = if input.task_token_or_id_reference.is_a?(Temporalio::Client::ActivityIDReference) + ref = input.task_token_or_id_reference + resp = if ref.is_a?(Temporalio::Client::ActivityIDReference) @client.workflow_service.record_activity_task_heartbeat_by_id( Api::WorkflowService::V1::RecordActivityTaskHeartbeatByIdRequest.new( - workflow_id: input.task_token_or_id_reference.workflow_id, - run_id: input.task_token_or_id_reference.run_id, - activity_id: input.task_token_or_id_reference.activity_id, + **Implementation._activity_id_reference_request_fields(ref), namespace: @client.namespace, identity: @client.connection.identity, details: @client.data_converter.to_payloads(input.details, hints: input.detail_hints) @@ -823,7 +854,7 @@ def heartbeat_async_activity(input) else @client.workflow_service.record_activity_task_heartbeat( Api::WorkflowService::V1::RecordActivityTaskHeartbeatRequest.new( - task_token: input.task_token_or_id_reference, + task_token: ref, namespace: @client.namespace, identity: @client.connection.identity, details: @client.data_converter.to_payloads(input.details, hints: input.detail_hints) @@ -841,12 +872,11 @@ def heartbeat_async_activity(input) end def complete_async_activity(input) - if input.task_token_or_id_reference.is_a?(Temporalio::Client::ActivityIDReference) + ref = input.task_token_or_id_reference + if ref.is_a?(Temporalio::Client::ActivityIDReference) @client.workflow_service.respond_activity_task_completed_by_id( Api::WorkflowService::V1::RespondActivityTaskCompletedByIdRequest.new( - workflow_id: input.task_token_or_id_reference.workflow_id, - run_id: input.task_token_or_id_reference.run_id, - activity_id: input.task_token_or_id_reference.activity_id, + **Implementation._activity_id_reference_request_fields(ref), namespace: @client.namespace, identity: @client.connection.identity, result: @client.data_converter.to_payloads([input.result], hints: Array(input.result_hint)) @@ -856,7 +886,7 @@ def complete_async_activity(input) else @client.workflow_service.respond_activity_task_completed( Api::WorkflowService::V1::RespondActivityTaskCompletedRequest.new( - task_token: input.task_token_or_id_reference, + task_token: ref, namespace: @client.namespace, identity: @client.connection.identity, result: @client.data_converter.to_payloads([input.result], hints: Array(input.result_hint)) @@ -876,12 +906,11 @@ def fail_async_activity(input) hints: input.last_heartbeat_detail_hints ) end - if input.task_token_or_id_reference.is_a?(Temporalio::Client::ActivityIDReference) + ref = input.task_token_or_id_reference + if ref.is_a?(Temporalio::Client::ActivityIDReference) @client.workflow_service.respond_activity_task_failed_by_id( Api::WorkflowService::V1::RespondActivityTaskFailedByIdRequest.new( - workflow_id: input.task_token_or_id_reference.workflow_id, - run_id: input.task_token_or_id_reference.run_id, - activity_id: input.task_token_or_id_reference.activity_id, + **Implementation._activity_id_reference_request_fields(ref), namespace: @client.namespace, identity: @client.connection.identity, failure: @client.data_converter.to_failure(input.error), @@ -892,7 +921,7 @@ def fail_async_activity(input) else @client.workflow_service.respond_activity_task_failed( Api::WorkflowService::V1::RespondActivityTaskFailedRequest.new( - task_token: input.task_token_or_id_reference, + task_token: ref, namespace: @client.namespace, identity: @client.connection.identity, failure: @client.data_converter.to_failure(input.error), @@ -905,12 +934,11 @@ def fail_async_activity(input) end def report_cancellation_async_activity(input) - if input.task_token_or_id_reference.is_a?(Temporalio::Client::ActivityIDReference) + ref = input.task_token_or_id_reference + if ref.is_a?(Temporalio::Client::ActivityIDReference) @client.workflow_service.respond_activity_task_canceled_by_id( Api::WorkflowService::V1::RespondActivityTaskCanceledByIdRequest.new( - workflow_id: input.task_token_or_id_reference.workflow_id, - run_id: input.task_token_or_id_reference.run_id, - activity_id: input.task_token_or_id_reference.activity_id, + **Implementation._activity_id_reference_request_fields(ref), namespace: @client.namespace, identity: @client.connection.identity, details: @client.data_converter.to_payloads(input.details, hints: input.detail_hints) @@ -920,7 +948,7 @@ def report_cancellation_async_activity(input) else @client.workflow_service.respond_activity_task_canceled( Api::WorkflowService::V1::RespondActivityTaskCanceledRequest.new( - task_token: input.task_token_or_id_reference, + task_token: ref, namespace: @client.namespace, identity: @client.connection.identity, details: @client.data_converter.to_payloads(input.details, hints: input.detail_hints) @@ -930,6 +958,166 @@ def report_cancellation_async_activity(input) end nil end + + def start_activity(input) + raise ArgumentError, 'activity_id is required' if input.activity_id.nil? || input.activity_id.empty? + raise ArgumentError, 'task_queue is required' if input.task_queue.nil? || input.task_queue.to_s.empty? + if input.schedule_to_close_timeout.nil? && input.start_to_close_timeout.nil? + raise ArgumentError, 'either schedule_to_close_timeout or start_to_close_timeout is required' + end + + req = Api::WorkflowService::V1::StartActivityExecutionRequest.new( + namespace: @client.namespace, + identity: @client.connection.identity, + request_id: SecureRandom.uuid, + activity_id: input.activity_id, + activity_type: Api::Common::V1::ActivityType.new(name: input.activity), + task_queue: Api::TaskQueue::V1::TaskQueue.new(name: input.task_queue.to_s), + input: @client.data_converter.to_payloads(input.args, hints: input.arg_hints), + schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout), + schedule_to_start_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_start_timeout), + start_to_close_timeout: ProtoUtils.seconds_to_duration(input.start_to_close_timeout), + heartbeat_timeout: ProtoUtils.seconds_to_duration(input.heartbeat_timeout), + id_reuse_policy: input.id_reuse_policy, + id_conflict_policy: input.id_conflict_policy, + retry_policy: input.retry_policy&._to_proto, + search_attributes: input.search_attributes&._to_proto, + user_metadata: ProtoUtils.to_user_metadata( + input.static_summary, input.static_details, @client.data_converter + ), + header: ProtoUtils.headers_to_proto(input.headers, @client.data_converter), + priority: input.priority._to_proto + ) + + begin + resp = @client.workflow_service.start_activity_execution( + req, + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + rescue Error::RPCError => e + if e.code == Error::RPCError::Code::ALREADY_EXISTS && e.grpc_status.details.first + details = e.grpc_status.details.first.unpack( + Api::ErrorDetails::V1::ActivityExecutionAlreadyStartedFailure + ) + if details + raise Error::ActivityAlreadyStartedError.new( + activity_id: input.activity_id, + activity_type: input.activity, + activity_run_id: details.run_id + ) + end + end + raise + end + + Temporalio::Client::ActivityHandle.new( + client: @client, + id: input.activity_id, + run_id: resp.run_id, + result_hint: input.result_hint + ) + end + + def describe_activity(input) + resp = @client.workflow_service.describe_activity_execution( + Api::WorkflowService::V1::DescribeActivityExecutionRequest.new( + namespace: @client.namespace, + activity_id: input.activity_id, + run_id: input.activity_run_id || '' + ), + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + Temporalio::Client::ActivityExecution::Description.new(resp, @client.data_converter) + end + + def cancel_activity(input) + @client.workflow_service.request_cancel_activity_execution( + Api::WorkflowService::V1::RequestCancelActivityExecutionRequest.new( + namespace: @client.namespace, + activity_id: input.activity_id, + run_id: input.activity_run_id || '', + identity: @client.connection.identity, + request_id: SecureRandom.uuid, + reason: input.reason + ), + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + nil + end + + def terminate_activity(input) + @client.workflow_service.terminate_activity_execution( + Api::WorkflowService::V1::TerminateActivityExecutionRequest.new( + namespace: @client.namespace, + activity_id: input.activity_id, + run_id: input.activity_run_id || '', + identity: @client.connection.identity, + request_id: SecureRandom.uuid, + reason: input.reason + ), + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + nil + end + + def list_activities(input) + Enumerator.new do |yielder| + req = Api::WorkflowService::V1::ListActivityExecutionsRequest.new( + namespace: @client.namespace, + query: input.query || '' + ) + loop do + resp = @client.workflow_service.list_activity_executions( + req, + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + resp.executions.each do |raw_info| + yielder << Temporalio::Client::ActivityExecution.new(raw_info) + end + break if resp.next_page_token.empty? + + req.next_page_token = resp.next_page_token + end + end + end + + def count_activities(input) + resp = @client.workflow_service.count_activity_executions( + Api::WorkflowService::V1::CountActivityExecutionsRequest.new( + namespace: @client.namespace, + query: input.query || '' + ), + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + Temporalio::Client::ActivityExecutionCount.new( + resp.count, + resp.groups.map do |group| + Temporalio::Client::ActivityExecutionCount::AggregationGroup.new( + group.count, + group.group_values.map { |payload| SearchAttributes._value_from_payload(payload) } + ) + end + ) + end + + # Long-polls PollActivityExecution until the activity reaches a terminal state. Returns the + # ActivityExecutionOutcome. The server's long-poll deadline may expire before the activity + # completes; in that case PollActivityExecutionResponse comes back with an unpopulated + # `outcome` field and the call must be reissued. + def fetch_activity_outcome(input) + req = Api::WorkflowService::V1::PollActivityExecutionRequest.new( + namespace: @client.namespace, + activity_id: input.activity_id, + run_id: input.activity_run_id || '' + ) + loop do + resp = @client.workflow_service.poll_activity_execution( + req, + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + return resp.outcome if resp.outcome + end + end end end end diff --git a/temporalio/lib/temporalio/internal/worker/activity_worker.rb b/temporalio/lib/temporalio/internal/worker/activity_worker.rb index 46648b7a..b101cd20 100644 --- a/temporalio/lib/temporalio/internal/worker/activity_worker.rb +++ b/temporalio/lib/temporalio/internal/worker/activity_worker.rb @@ -168,9 +168,18 @@ def handle_cancel_task(task_token, cancel) end def execute_activity(task_token, defn, start) - # Build info + # Build info. Standalone activities have some empty fields on the wire; translate + # empty strings to nil for the user-facing Info fields. + workflow_id = Internal::ProtoUtils.string_or(start.workflow_execution&.workflow_id, nil) + workflow_run_id = Internal::ProtoUtils.string_or(start.workflow_execution&.run_id, nil) + workflow_type = Internal::ProtoUtils.string_or(start.workflow_type, nil) + activity_run_id = Internal::ProtoUtils.string_or(start.run_id, nil) + # `namespace` is always set. `workflow_namespace` is the deprecated accessor, nil for standalone activities. + namespace = Internal::ProtoUtils.string_or(start.workflow_namespace, @worker.options.client.namespace) + workflow_namespace = workflow_id.nil? ? nil : namespace info = Activity::Info.new( activity_id: start.activity_id, + activity_run_id: activity_run_id, activity_type: start.activity_type, attempt: start.attempt, current_attempt_scheduled_time: Internal::ProtoUtils.timestamp_to_time( @@ -178,6 +187,7 @@ def execute_activity(task_token, defn, start) ) || raise, # Never nil heartbeat_timeout: Internal::ProtoUtils.duration_to_seconds(start.heartbeat_timeout), local?: start.is_local, + namespace: namespace, priority: Priority._from_proto(start.priority), raw_heartbeat_details: begin payloads = start.heartbeat_details.to_ary @@ -192,10 +202,10 @@ def execute_activity(task_token, defn, start) started_time: Internal::ProtoUtils.timestamp_to_time(start.started_time) || raise, # Never nil task_queue: @worker.options.task_queue, task_token:, - workflow_id: start.workflow_execution.workflow_id, - workflow_namespace: start.workflow_namespace, - workflow_run_id: start.workflow_execution.run_id, - workflow_type: start.workflow_type + workflow_id: workflow_id, + workflow_namespace: workflow_namespace, + workflow_run_id: workflow_run_id, + workflow_type: workflow_type ) # Build input diff --git a/temporalio/lib/temporalio/testing/activity_environment.rb b/temporalio/lib/temporalio/testing/activity_environment.rb index 001fc2a6..74928e60 100644 --- a/temporalio/lib/temporalio/testing/activity_environment.rb +++ b/temporalio/lib/temporalio/testing/activity_environment.rb @@ -18,11 +18,13 @@ class ActivityEnvironment def self.default_info @default_info ||= Activity::Info.new( activity_id: 'test', + activity_run_id: nil, activity_type: 'unknown', attempt: 1, current_attempt_scheduled_time: Time.at(0), heartbeat_timeout: nil, local?: false, + namespace: 'default', priority: Temporalio::Priority.default, raw_heartbeat_details: [], retry_policy: RetryPolicy.new, diff --git a/temporalio/sig/temporalio/activity/definition.rbs b/temporalio/sig/temporalio/activity/definition.rbs index 9e4f8b9a..25df6625 100644 --- a/temporalio/sig/temporalio/activity/definition.rbs +++ b/temporalio/sig/temporalio/activity/definition.rbs @@ -41,6 +41,10 @@ module Temporalio def self.from_activity: (Definition | singleton(Definition) | Info activity) -> Info + def self._type_and_hints_from_parameter: ( + singleton(Definition) | Definition | Info | Symbol | String activity + ) -> [String, Array[Object]?, Object?] + def initialize: ( name: String | Symbol | nil, ?instance: Object | Proc | nil, diff --git a/temporalio/sig/temporalio/activity/info.rbs b/temporalio/sig/temporalio/activity/info.rbs index 3e1a673f..f4bdece1 100644 --- a/temporalio/sig/temporalio/activity/info.rbs +++ b/temporalio/sig/temporalio/activity/info.rbs @@ -2,11 +2,13 @@ module Temporalio module Activity class Info attr_reader activity_id: String + attr_reader activity_run_id: String? attr_reader activity_type: String attr_reader attempt: Integer attr_reader current_attempt_scheduled_time: Time attr_reader heartbeat_timeout: Float? attr_reader local?: bool + attr_reader namespace: String attr_reader priority: Temporalio::Priority attr_reader raw_heartbeat_details: Array[Converters::RawValue] attr_reader retry_policy: RetryPolicy? @@ -16,18 +18,20 @@ module Temporalio attr_reader started_time: Time attr_reader task_queue: String attr_reader task_token: String - attr_reader workflow_id: String - attr_reader workflow_namespace: String - attr_reader workflow_run_id: String - attr_reader workflow_type: String + attr_reader workflow_id: String? + attr_reader workflow_namespace: String? + attr_reader workflow_run_id: String? + attr_reader workflow_type: String? def initialize: ( activity_id: String, + activity_run_id: String?, activity_type: String, attempt: Integer, current_attempt_scheduled_time: Time, heartbeat_timeout: Float?, local?: bool, + namespace: String, priority: Temporalio::Priority?, raw_heartbeat_details: Array[Converters::RawValue], retry_policy: RetryPolicy?, @@ -37,12 +41,14 @@ module Temporalio started_time: Time, task_queue: String, task_token: String, - workflow_id: String, - workflow_namespace: String, - workflow_run_id: String, - workflow_type: String + workflow_id: String?, + workflow_namespace: String?, + workflow_run_id: String?, + workflow_type: String? ) -> void + def in_workflow?: -> bool + def heartbeat_details: (?hints: Array[Object]?) -> Array[Object?] def with: (**untyped) -> Info diff --git a/temporalio/sig/temporalio/client.rbs b/temporalio/sig/temporalio/client.rbs index 66c4b73e..4fa2f289 100644 --- a/temporalio/sig/temporalio/client.rbs +++ b/temporalio/sig/temporalio/client.rbs @@ -71,6 +71,7 @@ module Temporalio def connection: -> Connection def namespace: -> String def data_converter: -> Converters::DataConverter + def _impl: -> Interceptor::Outbound def workflow_service: -> Connection::WorkflowService def operator_service: -> Connection::OperatorService @@ -131,6 +132,58 @@ module Temporalio ?result_hint: Object? ) -> WorkflowHandle + def activity_handle: ( + String activity_id, + ?activity_run_id: String?, + ?result_hint: Object? + ) -> ActivityHandle + + def start_activity: ( + singleton(Activity::Definition) | Activity::Definition::Info | Symbol | String activity, + *Object? args, + id: String, + task_queue: String, + ?schedule_to_close_timeout: duration?, + ?schedule_to_start_timeout: duration?, + ?start_to_close_timeout: duration?, + ?heartbeat_timeout: duration?, + ?id_reuse_policy: ActivityIDReusePolicy::enum, + ?id_conflict_policy: ActivityIDConflictPolicy::enum, + ?retry_policy: RetryPolicy?, + ?search_attributes: SearchAttributes?, + ?static_summary: String?, + ?static_details: String?, + ?priority: Priority, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, + ?rpc_options: RPCOptions? + ) -> ActivityHandle + + def execute_activity: ( + singleton(Activity::Definition) | Activity::Definition::Info | Symbol | String activity, + *Object? args, + id: String, + task_queue: String, + ?schedule_to_close_timeout: duration?, + ?schedule_to_start_timeout: duration?, + ?start_to_close_timeout: duration?, + ?heartbeat_timeout: duration?, + ?id_reuse_policy: ActivityIDReusePolicy::enum, + ?id_conflict_policy: ActivityIDConflictPolicy::enum, + ?retry_policy: RetryPolicy?, + ?search_attributes: SearchAttributes?, + ?static_summary: String?, + ?static_details: String?, + ?priority: Priority, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, + ?rpc_options: RPCOptions? + ) -> Object? + + def list_activities: (String query, ?rpc_options: RPCOptions?) -> Enumerator[ActivityExecution, ActivityExecution] + + def count_activities: (String query, ?rpc_options: RPCOptions?) -> ActivityExecutionCount + def start_update_with_start_workflow: ( Workflow::Definition::Update | Symbol | String update, *Object? args, diff --git a/temporalio/sig/temporalio/client/activity_execution.rbs b/temporalio/sig/temporalio/client/activity_execution.rbs new file mode 100644 index 00000000..dfa9a0dd --- /dev/null +++ b/temporalio/sig/temporalio/client/activity_execution.rbs @@ -0,0 +1,50 @@ +module Temporalio + class Client + class ActivityExecution + attr_reader raw_info: Api::Activity::V1::ActivityExecutionListInfo | Api::Activity::V1::ActivityExecutionInfo + + def initialize: (Api::Activity::V1::ActivityExecutionListInfo | Api::Activity::V1::ActivityExecutionInfo raw_info) -> void + + def activity_id: -> String + def activity_run_id: -> String? + def activity_type: -> String + def schedule_time: -> Time? + def close_time: -> Time? + def status: -> ActivityExecutionStatus::enum + def search_attributes: -> SearchAttributes? + def task_queue: -> String + def execution_duration: -> Float? + + class Description < ActivityExecution + attr_reader raw_description: Api::WorkflowService::V1::DescribeActivityExecutionResponse + + def initialize: (Api::WorkflowService::V1::DescribeActivityExecutionResponse raw_description, Converters::DataConverter data_converter) -> void + + def run_state: -> PendingActivityState::enum? + def schedule_to_close_timeout: -> Float? + def schedule_to_start_timeout: -> Float? + def start_to_close_timeout: -> Float? + def heartbeat_timeout: -> Float? + def has_heartbeat_details?: -> bool + def heartbeat_details: (?hints: Array[Object]?) -> Array[Object?] + def retry_policy: -> RetryPolicy + def last_heartbeat_time: -> Time? + def last_started_time: -> Time? + def attempt: -> Integer + def last_failure: -> Error::Failure? + def expiration_time: -> Time? + def last_worker_identity: -> String? + def current_retry_interval: -> Float? + def last_attempt_complete_time: -> Time? + def next_attempt_schedule_time: -> Time? + def last_deployment_version: -> WorkerDeploymentVersion? + def priority: -> Priority + def canceled_reason: -> String? + def static_summary: -> String? + def static_details: -> String? + + private def user_metadata: -> [String?, String?] + end + end + end +end diff --git a/temporalio/sig/temporalio/client/activity_execution_count.rbs b/temporalio/sig/temporalio/client/activity_execution_count.rbs new file mode 100644 index 00000000..64d1fed2 --- /dev/null +++ b/temporalio/sig/temporalio/client/activity_execution_count.rbs @@ -0,0 +1,17 @@ +module Temporalio + class Client + class ActivityExecutionCount + attr_reader count: Integer + attr_reader groups: Array[AggregationGroup] + + def initialize: (Integer count, Array[AggregationGroup] groups) -> void + + class AggregationGroup + attr_reader count: Integer + attr_reader group_values: Array[Object?] + + def initialize: (Integer count, Array[Object?] group_values) -> void + end + end + end +end diff --git a/temporalio/sig/temporalio/client/activity_execution_status.rbs b/temporalio/sig/temporalio/client/activity_execution_status.rbs new file mode 100644 index 00000000..39618ec0 --- /dev/null +++ b/temporalio/sig/temporalio/client/activity_execution_status.rbs @@ -0,0 +1,15 @@ +module Temporalio + class Client + module ActivityExecutionStatus + type enum = Integer + + UNSPECIFIED: enum + RUNNING: enum + COMPLETED: enum + FAILED: enum + CANCELED: enum + TERMINATED: enum + TIMED_OUT: enum + end + end +end diff --git a/temporalio/sig/temporalio/client/activity_handle.rbs b/temporalio/sig/temporalio/client/activity_handle.rbs new file mode 100644 index 00000000..e83a3514 --- /dev/null +++ b/temporalio/sig/temporalio/client/activity_handle.rbs @@ -0,0 +1,26 @@ +module Temporalio + class Client + class ActivityHandle + attr_reader id: String + attr_reader run_id: String? + attr_reader result_hint: Object? + + def initialize: ( + client: Client, + id: String, + run_id: String?, + result_hint: Object? + ) -> void + + def result: (?result_hint: Object?, ?rpc_options: RPCOptions?) -> Object? + + def describe: (?rpc_options: RPCOptions?) -> ActivityExecution::Description + + def cancel: (?String? reason, ?rpc_options: RPCOptions?) -> void + + def terminate: (?String? reason, ?rpc_options: RPCOptions?) -> void + + private def _process_outcome: (Api::Activity::V1::ActivityExecutionOutcome? outcome, Object? hint) -> Object? + end + end +end diff --git a/temporalio/sig/temporalio/client/activity_id_reference.rbs b/temporalio/sig/temporalio/client/activity_id_reference.rbs index 1c55861e..9a507557 100644 --- a/temporalio/sig/temporalio/client/activity_id_reference.rbs +++ b/temporalio/sig/temporalio/client/activity_id_reference.rbs @@ -1,11 +1,16 @@ module Temporalio class Client class ActivityIDReference - attr_reader workflow_id: String - attr_reader run_id: String? attr_reader activity_id: String + attr_reader activity_run_id: String? + attr_reader workflow_id: String? + attr_reader run_id: String? def initialize: (workflow_id: String, run_id: String?, activity_id: String) -> void + + def self.for_standalone: (activity_id: String, ?activity_run_id: String?) -> ActivityIDReference + + def standalone?: -> bool end end -end \ No newline at end of file +end diff --git a/temporalio/sig/temporalio/client/interceptor.rbs b/temporalio/sig/temporalio/client/interceptor.rbs index 98580b55..e51333a1 100644 --- a/temporalio/sig/temporalio/client/interceptor.rbs +++ b/temporalio/sig/temporalio/client/interceptor.rbs @@ -432,6 +432,116 @@ module Temporalio ) -> void end + class StartActivityInput + attr_reader activity: String + attr_reader args: Array[Object?] + attr_reader activity_id: String + attr_reader task_queue: String + attr_reader schedule_to_close_timeout: duration? + attr_reader schedule_to_start_timeout: duration? + attr_reader start_to_close_timeout: duration? + attr_reader heartbeat_timeout: duration? + attr_reader id_reuse_policy: ActivityIDReusePolicy::enum + attr_reader id_conflict_policy: ActivityIDConflictPolicy::enum + attr_reader retry_policy: RetryPolicy? + attr_reader search_attributes: SearchAttributes? + attr_reader static_summary: String? + attr_reader static_details: String? + attr_reader headers: Hash[String, Object?] + attr_reader priority: Priority + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + activity: String, + args: Array[Object?], + activity_id: String, + task_queue: String, + schedule_to_close_timeout: duration?, + schedule_to_start_timeout: duration?, + start_to_close_timeout: duration?, + heartbeat_timeout: duration?, + id_reuse_policy: ActivityIDReusePolicy::enum, + id_conflict_policy: ActivityIDConflictPolicy::enum, + retry_policy: RetryPolicy?, + search_attributes: SearchAttributes?, + static_summary: String?, + static_details: String?, + headers: Hash[String, Object?], + priority: Priority, + arg_hints: Array[Object]?, + result_hint: Object?, + rpc_options: RPCOptions? + ) -> void + end + + class DescribeActivityInput + attr_reader activity_id: String + attr_reader activity_run_id: String? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + activity_id: String, + activity_run_id: String?, + rpc_options: RPCOptions? + ) -> void + end + + class CancelActivityInput + attr_reader activity_id: String + attr_reader activity_run_id: String? + attr_reader reason: String? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + activity_id: String, + activity_run_id: String?, + reason: String?, + rpc_options: RPCOptions? + ) -> void + end + + class TerminateActivityInput + attr_reader activity_id: String + attr_reader activity_run_id: String? + attr_reader reason: String? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + activity_id: String, + activity_run_id: String?, + reason: String?, + rpc_options: RPCOptions? + ) -> void + end + + class ListActivitiesInput + attr_reader query: String + attr_reader rpc_options: RPCOptions? + + def initialize: (query: String, rpc_options: RPCOptions?) -> void + end + + class CountActivitiesInput + attr_reader query: String + attr_reader rpc_options: RPCOptions? + + def initialize: (query: String, rpc_options: RPCOptions?) -> void + end + + class FetchActivityOutcomeInput + attr_reader activity_id: String + attr_reader activity_run_id: String? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + activity_id: String, + activity_run_id: String?, + rpc_options: RPCOptions? + ) -> void + end + class Outbound attr_reader next_interceptor: Outbound @@ -490,6 +600,20 @@ module Temporalio def fail_async_activity: (FailAsyncActivityInput input) -> void def report_cancellation_async_activity: (ReportCancellationAsyncActivityInput input) -> void + + def start_activity: (StartActivityInput input) -> ActivityHandle + + def describe_activity: (DescribeActivityInput input) -> ActivityExecution::Description + + def cancel_activity: (CancelActivityInput input) -> void + + def terminate_activity: (TerminateActivityInput input) -> void + + def list_activities: (ListActivitiesInput input) -> Enumerator[ActivityExecution, ActivityExecution] + + def count_activities: (CountActivitiesInput input) -> ActivityExecutionCount + + def fetch_activity_outcome: (FetchActivityOutcomeInput input) -> Api::Activity::V1::ActivityExecutionOutcome? end end end diff --git a/temporalio/sig/temporalio/client/pending_activity_state.rbs b/temporalio/sig/temporalio/client/pending_activity_state.rbs new file mode 100644 index 00000000..e9cdc852 --- /dev/null +++ b/temporalio/sig/temporalio/client/pending_activity_state.rbs @@ -0,0 +1,14 @@ +module Temporalio + class Client + module PendingActivityState + type enum = Integer + + UNSPECIFIED: enum + SCHEDULED: enum + STARTED: enum + CANCEL_REQUESTED: enum + PAUSED: enum + PAUSE_REQUESTED: enum + end + end +end diff --git a/temporalio/sig/temporalio/common_enums.rbs b/temporalio/sig/temporalio/common_enums.rbs index 9c16555e..14b849ed 100644 --- a/temporalio/sig/temporalio/common_enums.rbs +++ b/temporalio/sig/temporalio/common_enums.rbs @@ -41,4 +41,20 @@ module Temporalio PINNED: enum AUTO_UPGRADE: enum end + + module ActivityIDReusePolicy + type enum = Integer + + ALLOW_DUPLICATE: enum + ALLOW_DUPLICATE_FAILED_ONLY: enum + REJECT_DUPLICATE: enum + end + + module ActivityIDConflictPolicy + type enum = Integer + + UNSPECIFIED: enum + FAIL: enum + USE_EXISTING: enum + end end diff --git a/temporalio/sig/temporalio/error.rbs b/temporalio/sig/temporalio/error.rbs index 945055da..72675e9b 100644 --- a/temporalio/sig/temporalio/error.rbs +++ b/temporalio/sig/temporalio/error.rbs @@ -12,6 +12,10 @@ module Temporalio def initialize: -> void end + class ActivityFailedError < Error + def initialize: -> void + end + class WorkflowContinuedAsNewError < Error attr_reader new_run_id: String diff --git a/temporalio/sig/temporalio/error/failure.rbs b/temporalio/sig/temporalio/error/failure.rbs index 738f3b4a..6b37ad39 100644 --- a/temporalio/sig/temporalio/error/failure.rbs +++ b/temporalio/sig/temporalio/error/failure.rbs @@ -11,6 +11,14 @@ module Temporalio def initialize: (workflow_id: String, workflow_type: String, run_id: String?) -> void end + class ActivityAlreadyStartedError < Failure + attr_reader activity_id: String + attr_reader activity_type: String + attr_reader activity_run_id: String? + + def initialize: (activity_id: String, activity_type: String, activity_run_id: String?) -> void + end + class ApplicationError < Failure attr_reader details: Array[Object?] attr_reader type: String? diff --git a/temporalio/sig/temporalio/internal/client/implementation.rbs b/temporalio/sig/temporalio/internal/client/implementation.rbs index 06965b22..e6cc9993 100644 --- a/temporalio/sig/temporalio/internal/client/implementation.rbs +++ b/temporalio/sig/temporalio/internal/client/implementation.rbs @@ -2,13 +2,31 @@ module Temporalio module Internal module Client class Implementation < Temporalio::Client::Interceptor::Outbound + STANDALONE_ACTIVITY_RESOURCE_ID_PREFIX: String + + def self._activity_id_reference_request_fields: (Temporalio::Client::ActivityIDReference ref) -> Hash[Symbol, String?] + def self.with_default_rpc_options: (Temporalio::Client::RPCOptions? user_rpc_options) -> Temporalio::Client::RPCOptions def initialize: (Temporalio::Client client) -> void - + def _start_workflow_request_from_with_start_options: ( untyped klass, Temporalio::Client::WithStartWorkflowOperation::Options start_options ) -> untyped + + def start_activity: (Temporalio::Client::Interceptor::StartActivityInput input) -> Temporalio::Client::ActivityHandle + + def describe_activity: (Temporalio::Client::Interceptor::DescribeActivityInput input) -> Temporalio::Client::ActivityExecution::Description + + def cancel_activity: (Temporalio::Client::Interceptor::CancelActivityInput input) -> void + + def terminate_activity: (Temporalio::Client::Interceptor::TerminateActivityInput input) -> void + + def list_activities: (Temporalio::Client::Interceptor::ListActivitiesInput input) -> Enumerator[Temporalio::Client::ActivityExecution, Temporalio::Client::ActivityExecution] + + def count_activities: (Temporalio::Client::Interceptor::CountActivitiesInput input) -> Temporalio::Client::ActivityExecutionCount + + def fetch_activity_outcome: (Temporalio::Client::Interceptor::FetchActivityOutcomeInput input) -> Temporalio::Api::Activity::V1::ActivityExecutionOutcome? end end end diff --git a/temporalio/test/activity_info_standalone_test.rb b/temporalio/test/activity_info_standalone_test.rb new file mode 100644 index 00000000..98372500 --- /dev/null +++ b/temporalio/test/activity_info_standalone_test.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'temporalio/client' +require 'temporalio/testing' +require 'temporalio/worker' +require 'temporalio/workflow' +require 'test' + +# Verify Activity::Info exposes the standalone-vs-workflow distinction correctly from inside the activity body. +# +# Two activity definitions report their info to a captured value; the tests then assert what the +# captured info contains based on how the activity was scheduled (standalone via Client#execute_activity vs. +# from a workflow via Workflow.execute_activity). +class ActivityInfoStandaloneTest < Test + CAPTURED = Queue.new + + class ReportInfoActivity < Temporalio::Activity::Definition + def execute + info = Temporalio::Activity::Context.current.info + CAPTURED << { + in_workflow: info.in_workflow?, + activity_run_id: info.activity_run_id, + workflow_id: info.workflow_id, + workflow_run_id: info.workflow_run_id, + workflow_type: info.workflow_type, + namespace: info.namespace, + workflow_namespace: info.workflow_namespace + } + 'reported' + end + end + + class CallActivityWorkflow < Temporalio::Workflow::Definition + def execute(task_queue) + Temporalio::Workflow.execute_activity(ReportInfoActivity, start_to_close_timeout: 10, task_queue: task_queue) + end + end + + def capture_next + Timeout.timeout(15) { CAPTURED.pop } + end + + def drain_captures + CAPTURED.clear + end + + def test_info_standalone_fields_set_correctly + drain_captures + task_queue = "saa-tq-#{SecureRandom.uuid}" + worker = Temporalio::Worker.new(client: env.client, task_queue: task_queue, activities: [ReportInfoActivity]) + worker.run do + env.client.execute_activity( + ReportInfoActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + end + captured = capture_next + refute captured[:in_workflow], 'standalone activity should report in_workflow? == false' + refute_nil captured[:activity_run_id], 'standalone activity should have a non-nil activity_run_id' + assert_nil captured[:workflow_id], 'standalone activity should have nil workflow_id' + assert_nil captured[:workflow_run_id], 'standalone activity should have nil workflow_run_id' + assert_nil captured[:workflow_type], 'standalone activity should have nil workflow_type' + assert_nil captured[:workflow_namespace], 'standalone activity should have nil workflow_namespace' + assert_equal env.client.namespace, captured[:namespace] + end + + def test_info_workflow_scheduled_fields_set_correctly + drain_captures + task_queue = "saa-tq-#{SecureRandom.uuid}" + worker = Temporalio::Worker.new( + client: env.client, + task_queue: task_queue, + workflows: [CallActivityWorkflow], + activities: [ReportInfoActivity] + ) + worker.run do + handle = env.client.start_workflow( + CallActivityWorkflow, + task_queue, + id: "wf-#{SecureRandom.uuid}", + task_queue: task_queue + ) + handle.result + end + captured = capture_next + assert captured[:in_workflow], 'workflow-scheduled activity should report in_workflow? == true' + refute_nil captured[:workflow_id], 'workflow-scheduled activity should have non-nil workflow_id' + refute_nil captured[:workflow_run_id], 'workflow-scheduled activity should have non-nil workflow_run_id' + refute_nil captured[:workflow_type], 'workflow-scheduled activity should have non-nil workflow_type' + assert_equal 'CallActivityWorkflow', captured[:workflow_type] + assert_equal env.client.namespace, captured[:namespace] + assert_equal env.client.namespace, captured[:workflow_namespace], + 'workflow-scheduled activity should have workflow_namespace equal to namespace' + end +end diff --git a/temporalio/test/client/activity_id_reference_test.rb b/temporalio/test/client/activity_id_reference_test.rb new file mode 100644 index 00000000..1a759d3a --- /dev/null +++ b/temporalio/test/client/activity_id_reference_test.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +require 'temporalio/client/activity_id_reference' +require 'test' + +class ActivityIDReferenceTest < Test + # `.new(workflow_id:, run_id:, activity_id:)` constructs a workflow-form reference. + def test_new_constructs_workflow_form + ref = Temporalio::Client::ActivityIDReference.new( + workflow_id: 'wf-1', run_id: 'r-1', activity_id: 'act-1' + ) + assert_equal 'wf-1', ref.workflow_id + assert_equal 'r-1', ref.run_id + assert_equal 'act-1', ref.activity_id + assert_nil ref.activity_run_id + refute_predicate ref, :standalone? + end + + def test_new_accepts_nil_run_id + ref = Temporalio::Client::ActivityIDReference.new( + workflow_id: 'wf-1', run_id: nil, activity_id: 'act-1' + ) + assert_nil ref.run_id + refute_predicate ref, :standalone? + end + + # The standalone factory bypasses `initialize` via `allocate.tap` — verify it produces a fully-formed + # instance with only the standalone-form fields set. + def test_for_standalone_with_run_id + ref = Temporalio::Client::ActivityIDReference.for_standalone( + activity_id: 'act-1', activity_run_id: 'ar-1' + ) + assert_equal 'act-1', ref.activity_id + assert_equal 'ar-1', ref.activity_run_id + assert_nil ref.workflow_id + assert_nil ref.run_id + assert_predicate ref, :standalone? + end + + # `activity_run_id` is optional — nil targets "latest run" semantics. + def test_for_standalone_without_run_id + ref = Temporalio::Client::ActivityIDReference.for_standalone(activity_id: 'act-1') + assert_equal 'act-1', ref.activity_id + assert_nil ref.activity_run_id + assert_nil ref.workflow_id + assert_nil ref.run_id + assert_predicate ref, :standalone? + end + + # Both shapes must coexist on the same class without polluting each other. + def test_two_forms_are_independent + wf_ref = Temporalio::Client::ActivityIDReference.new( + workflow_id: 'wf-1', run_id: 'r-1', activity_id: 'act-1' + ) + sa_ref = Temporalio::Client::ActivityIDReference.for_standalone( + activity_id: 'act-2', activity_run_id: 'ar-2' + ) + # workflow-form + assert_equal 'wf-1', wf_ref.workflow_id + assert_nil wf_ref.activity_run_id + # standalone-form + assert_nil sa_ref.workflow_id + assert_equal 'ar-2', sa_ref.activity_run_id + end +end diff --git a/temporalio/test/client_activity_async_completion_test.rb b/temporalio/test/client_activity_async_completion_test.rb new file mode 100644 index 00000000..c6c877c5 --- /dev/null +++ b/temporalio/test/client_activity_async_completion_test.rb @@ -0,0 +1,179 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'temporalio/activity' +require 'temporalio/client' +require 'temporalio/testing' +require 'temporalio/worker' +require 'test' + +# Async completion of standalone activities. Mirrors the workflow-scoped async-completion tests in +# worker_activity_test.rb but targets standalone-form ActivityIDReferences (constructed via +# `ActivityIDReference.for_standalone(activity_id:, activity_run_id:)`). +class ClientActivityAsyncCompletionTest < Test + # Activity that signals readiness on a queue, then raises CompleteAsyncError to defer completion to + # an external async caller. The queue is purely a timing barrier — it tells the test "I've executed + # and I'm now in the async-pending state, you can complete me." The activity_id/activity_run_id are + # read from the handle, not from the queue. + class AsyncCompleteActivity < Temporalio::Activity::Definition + READY = Queue.new + + def self.wait_ready + Timeout.timeout(15) { READY.pop } + end + + def self.drain + READY.clear + end + + def execute + READY << true + raise Temporalio::Activity::CompleteAsyncError + end + end + + def with_async_worker(&) + task_queue = "saa-tq-#{SecureRandom.uuid}" + worker = Temporalio::Worker.new( + client: env.client, + task_queue: task_queue, + activities: [AsyncCompleteActivity] + ) + worker.run { yield task_queue } + end + + def test_async_completion_complete_by_activity_id + AsyncCompleteActivity.drain + with_async_worker do |task_queue| + handle = env.client.start_activity( + AsyncCompleteActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 30 + ) + AsyncCompleteActivity.wait_ready + # Standalone reference with activity_run_id omitted — "target the latest run." + ref = Temporalio::Client::ActivityIDReference.for_standalone(activity_id: handle.id) + env.client.async_activity_handle(ref).complete('async-done') + assert_equal 'async-done', handle.result + end + end + + def test_async_completion_complete_with_run_id + AsyncCompleteActivity.drain + with_async_worker do |task_queue| + handle = env.client.start_activity( + AsyncCompleteActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 30 + ) + AsyncCompleteActivity.wait_ready + ref = Temporalio::Client::ActivityIDReference.for_standalone( + activity_id: handle.id, + activity_run_id: handle.run_id + ) + env.client.async_activity_handle(ref).complete('with-run-id') + assert_equal 'with-run-id', handle.result + end + end + + def test_async_completion_heartbeat_standalone + AsyncCompleteActivity.drain + with_async_worker do |task_queue| + handle = env.client.start_activity( + AsyncCompleteActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 30, + heartbeat_timeout: 30 + ) + AsyncCompleteActivity.wait_ready + ref = Temporalio::Client::ActivityIDReference.for_standalone( + activity_id: handle.id, + activity_run_id: handle.run_id + ) + env.client.async_activity_handle(ref).heartbeat('hb-1', 'hb-2') + assert_equal %w[hb-1 hb-2], + env.client.data_converter.from_payloads(handle.describe.raw_info.heartbeat_details) + env.client.async_activity_handle(ref).complete('done-after-heartbeat') + assert_equal 'done-after-heartbeat', handle.result + end + end + + def test_async_completion_fail_standalone + AsyncCompleteActivity.drain + with_async_worker do |task_queue| + # max_attempts: 1 ensures the async fail terminates the activity rather than triggering a retry + # (which would re-execute the activity, hit CompleteAsyncError again, and loop). + handle = env.client.start_activity( + AsyncCompleteActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 30, + retry_policy: Temporalio::RetryPolicy.new(max_attempts: 1) + ) + AsyncCompleteActivity.wait_ready + ref = Temporalio::Client::ActivityIDReference.for_standalone( + activity_id: handle.id, + activity_run_id: handle.run_id + ) + env.client.async_activity_handle(ref).fail( + Temporalio::Error::ApplicationError.new('async-fail-reason', non_retryable: true) + ) + err = assert_raises(Temporalio::Error::ActivityFailedError) { handle.result } + assert_instance_of Temporalio::Error::ApplicationError, err.cause + assert_equal 'async-fail-reason', err.cause.message + end + end + + def test_async_completion_heartbeat_and_fail_standalone + AsyncCompleteActivity.drain + with_async_worker do |task_queue| + handle = env.client.start_activity( + AsyncCompleteActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 30, + heartbeat_timeout: 30, + retry_policy: Temporalio::RetryPolicy.new(max_attempts: 1) + ) + AsyncCompleteActivity.wait_ready + ref = Temporalio::Client::ActivityIDReference.for_standalone( + activity_id: handle.id, + activity_run_id: handle.run_id + ) + env.client.async_activity_handle(ref).heartbeat('hb-1', 'hb-2') + assert_equal %w[hb-1 hb-2], + env.client.data_converter.from_payloads(handle.describe.raw_info.heartbeat_details) + env.client.async_activity_handle(ref).fail( + Temporalio::Error::ApplicationError.new('hb-then-fail', non_retryable: true) + ) + err = assert_raises(Temporalio::Error::ActivityFailedError) { handle.result } + assert_instance_of Temporalio::Error::ApplicationError, err.cause + assert_equal 'hb-then-fail', err.cause.message + end + end + + def test_async_completion_report_cancellation_standalone + AsyncCompleteActivity.drain + with_async_worker do |task_queue| + handle = env.client.start_activity( + AsyncCompleteActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 30, + heartbeat_timeout: 30 + ) + AsyncCompleteActivity.wait_ready + ref = Temporalio::Client::ActivityIDReference.for_standalone( + activity_id: handle.id, + activity_run_id: handle.run_id + ) + handle.cancel('please-cancel') + env.client.async_activity_handle(ref).report_cancellation + err = assert_raises(Temporalio::Error::ActivityFailedError) { handle.result } + assert_instance_of Temporalio::Error::CanceledError, err.cause + end + end +end diff --git a/temporalio/test/client_activity_hints_test.rb b/temporalio/test/client_activity_hints_test.rb new file mode 100644 index 00000000..52de72ef --- /dev/null +++ b/temporalio/test/client_activity_hints_test.rb @@ -0,0 +1,214 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'temporalio/activity' +require 'temporalio/client' +require 'temporalio/converters/data_converter' +require 'temporalio/converters/payload_converter' +require 'temporalio/testing' +require 'temporalio/worker' +require 'test' + +# Verify that hints (arg_hints, result_hint) propagate correctly through SAA paths. +# Uses a wrapping JSON payload converter that captures every hint passed to to_payload / from_payload. +class ClientActivityHintsTest < Test + class HintTrackingJSONConverter < Temporalio::Converters::PayloadConverter::JSONPlain + attr_accessor :outbound_hints, :inbound_hints + + def to_payload(value, hint: nil) + (@outbound_hints ||= []) << { value:, hint: } + super + end + + def from_payload(payload, hint: nil) + super.tap { |value| (@inbound_hints ||= []) << { value:, hint: } } + end + end + + class HintActivity < Temporalio::Activity::Definition + activity_arg_hint :saa_arg + activity_result_hint :saa_result + + def execute(value) + "result-of:#{value}" + end + end + + # Activity that signals readiness then raises CompleteAsyncError so the test can complete it + # externally through AsyncActivityHandle (the path being hint-tested). + class AsyncHintActivity < Temporalio::Activity::Definition + READY = Queue.new + + def self.wait_ready + Timeout.timeout(15) { READY.pop } + end + + def self.drain + READY.clear + end + + def execute + READY << true + raise Temporalio::Activity::CompleteAsyncError + end + end + + def build_tracking_client + @hint_converter = HintTrackingJSONConverter.new + Temporalio::Client.new(**env.client.options.with( + data_converter: Temporalio::Converters::DataConverter.new( + payload_converter: Temporalio::Converters::PayloadConverter::Composite.new( + *Temporalio::Converters::PayloadConverter.default.converters.values.map do |c| + c.is_a?(Temporalio::Converters::PayloadConverter::JSONPlain) ? @hint_converter : c + end + ) + ) + ).to_h) + end + + def test_activity_hints_from_definition + client = build_tracking_client + task_queue = "saa-hints-tq-#{SecureRandom.uuid}" + Temporalio::Worker.new(client:, task_queue:, activities: [HintActivity]).run do + client.execute_activity( + HintActivity, 'hello', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + end + # Definition's arg_hint (:saa_arg) used when encoding the activity arg on the client side. + outbound = @hint_converter.outbound_hints || [] + arg_encode = outbound.find { |e| e[:value] == 'hello' } + refute_nil arg_encode, 'Expected client to encode the activity argument' + assert_equal :saa_arg, arg_encode[:hint], 'Client-side arg encode should use definition arg_hint' + + # Definition's result_hint (:saa_result) used when decoding the activity result on the client side. + inbound = @hint_converter.inbound_hints || [] + result_decode = inbound.find { |e| e[:value] == 'result-of:hello' } + refute_nil result_decode, 'Expected client to decode the activity result' + assert_equal :saa_result, result_decode[:hint], 'Client-side result decode should use definition result_hint' + + # Worker-side arg decode uses the definition's arg_hint. + worker_arg_decode = inbound.find { |e| e[:value] == 'hello' } + refute_nil worker_arg_decode, 'Expected worker to decode the activity argument' + assert_equal :saa_arg, worker_arg_decode[:hint], 'Worker-side arg decode should use definition arg_hint' + + # Worker-side result encode uses the definition's result_hint. + worker_result_encode = outbound.find { |e| e[:value] == 'result-of:hello' } + refute_nil worker_result_encode, 'Expected worker to encode the activity result' + assert_equal :saa_result, worker_result_encode[:hint], + 'Worker-side result encode should use definition result_hint' + end + + def test_activity_hints_call_site_override + client = build_tracking_client + task_queue = "saa-hints-tq-#{SecureRandom.uuid}" + Temporalio::Worker.new(client:, task_queue:, activities: [HintActivity]).run do + client.execute_activity( + HintActivity, 'override', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10, + arg_hints: [:overridden_arg], + result_hint: :overridden_result + ) + end + outbound = @hint_converter.outbound_hints || [] + arg_encode = outbound.find { |e| e[:value] == 'override' } + refute_nil arg_encode + assert_equal :overridden_arg, arg_encode[:hint], + 'Call-site arg_hints should override definition arg_hint' + + inbound = @hint_converter.inbound_hints || [] + result_decode = inbound.find { |e| e[:value] == 'result-of:override' } + refute_nil result_decode + assert_equal :overridden_result, result_decode[:hint], + 'Call-site result_hint should override definition result_hint' + end + + def test_activity_hints_by_name_no_definition_lookup + client = build_tracking_client + task_queue = "saa-hints-tq-#{SecureRandom.uuid}" + Temporalio::Worker.new(client:, task_queue:, activities: [HintActivity]).run do + client.execute_activity( + 'HintActivity', 'by-name', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + end + # By-name has no definition to read hints from; encode uses nil hint. + outbound = @hint_converter.outbound_hints || [] + arg_encode = outbound.find { |e| e[:value] == 'by-name' } + refute_nil arg_encode + assert_nil arg_encode[:hint], 'By-name activity should encode with nil hint' + + inbound = @hint_converter.inbound_hints || [] + result_decode = inbound.find { |e| e[:value] == 'result-of:by-name' } + refute_nil result_decode + assert_nil result_decode[:hint], 'By-name activity should decode result with nil hint' + end + + def test_async_completion_complete_uses_result_hint + AsyncHintActivity.drain + client = build_tracking_client + task_queue = "saa-hints-tq-#{SecureRandom.uuid}" + Temporalio::Worker.new(client:, task_queue:, activities: [AsyncHintActivity]).run do + handle = client.start_activity( + AsyncHintActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 30 + ) + AsyncHintActivity.wait_ready + ref = Temporalio::Client::ActivityIDReference.for_standalone(activity_id: handle.id) + client.async_activity_handle(ref).complete('async-result', result_hint: :async_complete_hint) + handle.result + end + outbound = @hint_converter.outbound_hints || [] + result_encode = outbound.find { |e| e[:value] == 'async-result' } + refute_nil result_encode, 'Expected async completion to encode the result payload' + assert_equal :async_complete_hint, result_encode[:hint], + 'AsyncActivityHandle#complete should pass result_hint into the converter' + end + + def test_activity_handle_constructor_result_hint_used_for_result_decode + client = build_tracking_client + task_queue = "saa-hints-tq-#{SecureRandom.uuid}" + activity_id = "act-#{SecureRandom.uuid}" + Temporalio::Worker.new(client:, task_queue:, activities: [HintActivity]).run do + client.execute_activity( + HintActivity, 'ctor-hint', + id: activity_id, task_queue: task_queue, start_to_close_timeout: 10 + ) + # Fresh handle with a constructor-time result_hint; no override at result() call site. + handle = client.activity_handle(activity_id, result_hint: :constructor_result_hint) + handle.result + end + inbound = @hint_converter.inbound_hints || [] + result_decode = inbound.find { |e| e[:hint] == :constructor_result_hint } + refute_nil result_decode, + 'Expected handle.result to decode with the constructor-time result_hint from activity_handle()' + assert_equal 'result-of:ctor-hint', result_decode[:value] + end + + def test_activity_handle_result_hint_override + client = build_tracking_client + task_queue = "saa-hints-tq-#{SecureRandom.uuid}" + Temporalio::Worker.new(client:, task_queue:, activities: [HintActivity]).run do + handle = client.start_activity( + HintActivity, 'override-via-result', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + # Override at the result()-call site instead of at start_activity time. + handle.result(result_hint: :result_call_override) + end + inbound = @hint_converter.inbound_hints || [] + result_decode = inbound.find { |e| e[:value] == 'result-of:override-via-result' } + refute_nil result_decode + assert_equal :result_call_override, result_decode[:hint] + end +end diff --git a/temporalio/test/client_activity_interceptor_chain_test.rb b/temporalio/test/client_activity_interceptor_chain_test.rb new file mode 100644 index 00000000..00bb6411 --- /dev/null +++ b/temporalio/test/client_activity_interceptor_chain_test.rb @@ -0,0 +1,193 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'temporalio/client' +require 'temporalio/testing' +require 'temporalio/worker' +require 'test' + +class ClientActivityInterceptorChainTest < Test + class SimpleActivity < Temporalio::Activity::Definition + def execute + 'ok' + end + end + + # Records the order of calls through the interceptor chain. + class RecordingInterceptor + include Temporalio::Client::Interceptor + + attr_reader :events + + def initialize(name, events_array) + @name = name + @events = events_array + end + + def intercept_client(next_interceptor) + Outbound.new(next_interceptor, @name, @events) + end + + class Outbound < Temporalio::Client::Interceptor::Outbound + def initialize(next_interceptor, name, events) + super(next_interceptor) + @name = name + @events = events + end + + def start_activity(input) + @events << "#{@name}:start_activity" + super + end + + def describe_activity(input) + @events << "#{@name}:describe_activity" + super + end + + def cancel_activity(input) + @events << "#{@name}:cancel_activity" + super + end + + def terminate_activity(input) + @events << "#{@name}:terminate_activity" + super + end + + def list_activities(input) + @events << "#{@name}:list_activities" + super + end + + def count_activities(input) + @events << "#{@name}:count_activities" + super + end + + def fetch_activity_outcome(input) + @events << "#{@name}:fetch_activity_outcome" + super + end + end + end + + def client_with_interceptors(interceptors) + Temporalio::Client.new(**env.client.options.with(interceptors: interceptors).to_h) + end + + def with_activity_worker(client, activities, &) + task_queue = "saa-tq-#{SecureRandom.uuid}" + worker = Temporalio::Worker.new( + client: client, + task_queue: task_queue, + activities: activities + ) + worker.run { yield task_queue } + end + + def test_start_activity_interceptor_is_called + events = [] + interceptor = RecordingInterceptor.new('A', events) + client = client_with_interceptors([interceptor]) + with_activity_worker(client, [SimpleActivity]) do |task_queue| + client.execute_activity( + SimpleActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + end + # execute_activity is start_activity + result (which calls fetch_activity_outcome). + assert_includes events, 'A:start_activity' + assert_includes events, 'A:fetch_activity_outcome' + end + + def test_describe_terminate_interceptors_called + events = [] + interceptor = RecordingInterceptor.new('A', events) + client = client_with_interceptors([interceptor]) + with_activity_worker(client, [SimpleActivity]) do |task_queue| + handle = client.start_activity( + SimpleActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + handle.describe + handle.terminate('test') + end + assert_includes events, 'A:describe_activity' + assert_includes events, 'A:terminate_activity' + end + + def test_cancel_activity_interceptor_called + events = [] + interceptor = RecordingInterceptor.new('A', events) + client = client_with_interceptors([interceptor]) + with_activity_worker(client, [SimpleActivity]) do |task_queue| + handle = client.start_activity( + SimpleActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + # SimpleActivity is fast; the cancel RPC may race the activity's completion and the server + # may reject it. The interceptor still runs client-side before the RPC, which is what we're + # asserting. Swallow any RPC error. + begin + handle.cancel('test') + rescue Temporalio::Error::RPCError + # expected: cancel rejected after activity already completed + end + end + assert_includes events, 'A:cancel_activity' + end + + def test_list_and_count_activities_interceptors_called + events = [] + interceptor = RecordingInterceptor.new('A', events) + client = client_with_interceptors([interceptor]) + client.list_activities('').to_a + client.count_activities('') + assert_includes events, 'A:list_activities' + assert_includes events, 'A:count_activities' + end + + def test_two_interceptors_called_in_order_on_start + events = [] + a = RecordingInterceptor.new('A', events) + b = RecordingInterceptor.new('B', events) + # First-added is outermost. + client = client_with_interceptors([a, b]) + with_activity_worker(client, [SimpleActivity]) do |task_queue| + client.execute_activity( + SimpleActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + end + start_events = events.select { |e| e.end_with?(':start_activity') } + assert_equal %w[A:start_activity B:start_activity], start_events + end + + def test_interceptor_list_order_determines_call_order + events = [] + b = RecordingInterceptor.new('B', events) + a = RecordingInterceptor.new('A', events) + # Order reversed from previous test. + client = client_with_interceptors([b, a]) + with_activity_worker(client, [SimpleActivity]) do |task_queue| + client.execute_activity( + SimpleActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + end + # B is first-added, so it's outermost now. + start_events = events.select { |e| e.end_with?(':start_activity') } + assert_equal %w[B:start_activity A:start_activity], start_events + end +end diff --git a/temporalio/test/client_activity_test.rb b/temporalio/test/client_activity_test.rb new file mode 100644 index 00000000..5a60872c --- /dev/null +++ b/temporalio/test/client_activity_test.rb @@ -0,0 +1,722 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'temporalio/client' +require 'temporalio/testing' +require 'temporalio/worker' +require 'test' + +class ClientActivityTest < Test + class SimpleActivity < Temporalio::Activity::Definition + activity_arg_hint :saa_arg + activity_result_hint :saa_result + + def execute(value) + "saa: #{value}" + end + end + + class VoidActivity < Temporalio::Activity::Definition + def execute + # Returns nil implicitly. + end + end + + class SlowActivity < Temporalio::Activity::Definition + def execute + Temporalio::Activity::Context.current.heartbeat + sleep 0.1 until Temporalio::Activity::Context.current.cancellation.canceled? + raise Temporalio::Error::CanceledError, 'canceled' + end + end + + class FailingActivity < Temporalio::Activity::Definition + def execute + raise Temporalio::Error::ApplicationError.new('intentional failure', 'detail1', non_retryable: true) + end + end + + # Sleeps for `delay_seconds` then returns "delayed:#{delay}". Used by tests that need + # observable blocking behavior on the client side. + class DelayedActivity < Temporalio::Activity::Definition + def execute(delay_seconds) + sleep(delay_seconds) + "delayed:#{delay_seconds}" + end + end + + # Dynamic activity (no name) — declared only to verify that Client#start_activity rejects it. + # Never registered with a worker in these tests. + class DynamicActivity < Temporalio::Activity::Definition + activity_dynamic + + def execute(*_args) + 'dynamic' + end + end + + # Fails on the first attempt, succeeds on the second. Used to force a retry so `attempt > 1` is + # observable. Uses `Activity::Context.current.info.attempt` rather than class-level state so it's + # safe to share across parallel tests. + class RetryOnceActivity < Temporalio::Activity::Definition + def execute + if Temporalio::Activity::Context.current.info.attempt < 2 + raise Temporalio::Error::ApplicationError, 'retryable failure on attempt 1' + end + + 'succeeded-after-retry' + end + end + + # Run a worker with the supplied activities for the body of the block and yield the task_queue. + def with_activity_worker(activities, &) + task_queue = "saa-tq-#{SecureRandom.uuid}" + worker = Temporalio::Worker.new( + client: env.client, + task_queue: task_queue, + activities: activities + ) + worker.run { yield task_queue } + end + + def test_execute_activity_simple_with_result + with_activity_worker([SimpleActivity]) do |task_queue| + result = env.client.execute_activity( + SimpleActivity, + 'hi', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_equal 'saa: hi', result + end + end + + def test_execute_activity_void_result + with_activity_worker([VoidActivity]) do |task_queue| + result = env.client.execute_activity( + VoidActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_nil result + end + end + + def test_execute_activity_by_name + with_activity_worker([SimpleActivity]) do |task_queue| + result = env.client.execute_activity( + 'SimpleActivity', + 'by-name', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_equal 'saa: by-name', result + end + end + + def test_execute_activity_by_symbol + with_activity_worker([SimpleActivity]) do |task_queue| + result = env.client.execute_activity( + :SimpleActivity, + 'by-symbol', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_equal 'saa: by-symbol', result + end + end + + def test_execute_activity_by_definition_instance + with_activity_worker([SimpleActivity]) do |task_queue| + result = env.client.execute_activity( + SimpleActivity.new, + 'by-instance', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_equal 'saa: by-instance', result + end + end + + def test_execute_activity_by_definition_info + with_activity_worker([SimpleActivity]) do |task_queue| + info = Temporalio::Activity::Definition::Info.from_activity(SimpleActivity) + result = env.client.execute_activity( + info, + 'by-info', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_equal 'saa: by-info', result + end + end + + def test_start_activity_rejects_dynamic_activity + err = assert_raises(ArgumentError) do + env.client.start_activity( + DynamicActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: 'unreached-tq', + start_to_close_timeout: 10 + ) + end + assert_match(/dynamic activity/i, err.message) + end + + def test_start_activity_rejects_junk_param + err = assert_raises(ArgumentError) do + env.client.start_activity( + 42, # steep:ignore + id: "act-#{SecureRandom.uuid}", + task_queue: 'unreached-tq', + start_to_close_timeout: 10 + ) + end + assert_match(/not an activity/i, err.message) + end + + def test_start_activity_rejects_empty_activity_id + err = assert_raises(ArgumentError) do + env.client.start_activity( + SimpleActivity, 'x', + id: '', + task_queue: 'unreached-tq', + start_to_close_timeout: 10 + ) + end + assert_match(/activity_id is required/i, err.message) + end + + def test_start_activity_rejects_empty_task_queue + err = assert_raises(ArgumentError) do + env.client.start_activity( + SimpleActivity, 'x', + id: "act-#{SecureRandom.uuid}", + task_queue: '', + start_to_close_timeout: 10 + ) + end + assert_match(/task_queue is required/i, err.message) + end + + def test_start_activity_rejects_missing_timeouts + err = assert_raises(ArgumentError) do + env.client.start_activity( + SimpleActivity, 'x', + id: "act-#{SecureRandom.uuid}", + task_queue: 'unreached-tq' + ) + end + assert_match(/schedule_to_close_timeout or start_to_close_timeout/i, err.message) + end + + def test_start_activity_already_started_throws + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + handle = env.client.start_activity( + SlowActivity, + id: activity_id, + task_queue: task_queue, + start_to_close_timeout: 30 + ) + err = assert_raises(Temporalio::Error::ActivityAlreadyStartedError) do + env.client.start_activity( + SlowActivity, + id: activity_id, + task_queue: task_queue, + start_to_close_timeout: 30, + id_conflict_policy: Temporalio::ActivityIDConflictPolicy::FAIL + ) + end + assert_equal activity_id, err.activity_id + handle.terminate + end + end + + def test_only_schedule_to_close_timeout_is_valid + with_activity_worker([SimpleActivity]) do |task_queue| + result = env.client.execute_activity( + SimpleActivity, + 'only-schedule-to-close', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + schedule_to_close_timeout: 10 + ) + assert_equal 'saa: only-schedule-to-close', result + end + end + + def test_only_start_to_close_timeout_is_valid + with_activity_worker([SimpleActivity]) do |task_queue| + result = env.client.execute_activity( + SimpleActivity, + 'only-start-to-close', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_equal 'saa: only-start-to-close', result + end + end + + def test_get_activity_result_failure_throws_activity_failed_error + with_activity_worker([FailingActivity]) do |task_queue| + err = assert_raises(Temporalio::Error::ActivityFailedError) do + env.client.execute_activity( + FailingActivity, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + end + assert_instance_of Temporalio::Error::ApplicationError, err.cause + assert_equal 'intentional failure', err.cause.message + assert_equal ['detail1'], err.cause.details + end + end + + def test_describe_running_and_terminated_is_accurate + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + handle = env.client.start_activity( + SlowActivity, + id: activity_id, + task_queue: task_queue, + start_to_close_timeout: 30 + ) + + desc = handle.describe + assert_equal activity_id, desc.activity_id + assert_equal 'SlowActivity', desc.activity_type + # Status should be RUNNING (1). + assert_equal Temporalio::Client::ActivityExecutionStatus::RUNNING, desc.status + + handle.terminate('test-termination') + # After terminate, status should reach TERMINATED eventually. + assert_eventually do + d = handle.describe + assert_equal Temporalio::Client::ActivityExecutionStatus::TERMINATED, d.status + end + end + end + + def test_describe_raw_info_matches_typed_accessors + with_activity_worker([SimpleActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + env.client.execute_activity( + SimpleActivity, + 'raw-info', + id: activity_id, + task_queue: task_queue, + start_to_close_timeout: 10 + ) + desc = env.client.activity_handle(activity_id).describe + assert_equal desc.raw_description.info.activity_id, desc.activity_id + assert_equal desc.raw_description.info.activity_type.name, desc.activity_type + assert_equal desc.raw_description.info.task_queue, desc.task_queue + assert_equal desc.raw_description.info.attempt, desc.attempt + end + end + + def test_terminate_running_activity_result_throws_terminated_error + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + handle = env.client.start_activity( + SlowActivity, + id: activity_id, + task_queue: task_queue, + start_to_close_timeout: 30 + ) + handle.terminate('intentional') + err = assert_raises(Temporalio::Error::ActivityFailedError) do + handle.result + end + assert_instance_of Temporalio::Error::TerminatedError, err.cause + end + end + + def test_get_activity_result_polls_until_activity_completes + # Genuinely test the blocking behavior of result(): start a slow activity, call result, + # and assert both that we waited long enough for the activity to finish AND that we got + # the right value back. A SimpleActivity-based test wouldn't prove polling at all because + # the activity is already done by the time result() asks the server. + delay = 2.0 + with_activity_worker([DelayedActivity]) do |task_queue| + handle = env.client.start_activity( + DelayedActivity, delay, + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 30 + ) + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + result = handle.result + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_equal "delayed:#{delay}", result + assert_operator elapsed, :>=, delay * 0.75, + "Expected result() to block for at least #{delay * 0.75}s (proving long-poll), got #{elapsed}s" + end + end + + def test_get_activity_result_reissues_poll_across_server_long_poll_deadline + # Verifies that fetch_activity_outcome reissues PollActivityExecution when the server's + # long-poll deadline expires before the activity reaches a terminal state. Simulates the + # server returning an empty PollActivityExecutionResponse (no `outcome`) for the first few + # calls — exactly what the server returns when its long-poll deadline expires — by wrapping + # the workflow_service so the loop is forced to iterate across at least one such "deadline." + # Without the loop, the first empty response would cause ActivityHandle#result to raise + # "Activity completed but outcome is missing from server response." + with_activity_worker([SimpleActivity]) do |task_queue| + ws = env.client.workflow_service + original_poll = ws.method(:poll_activity_execution) + empty_responses_remaining = 2 + poll_count = 0 + ws.define_singleton_method(:poll_activity_execution) do |req, **kwargs| + poll_count += 1 + if empty_responses_remaining.positive? + empty_responses_remaining -= 1 + # Server's long-poll deadline expired without the activity reaching a terminal state. + Temporalio::Api::WorkflowService::V1::PollActivityExecutionResponse.new + else + original_poll.call(req, **kwargs) + end + end + + begin + result = env.client.execute_activity( + SimpleActivity, 'reissue', + id: "act-#{SecureRandom.uuid}", + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_equal 'saa: reissue', result + assert_operator poll_count, :>=, 3, + 'Expected at least 3 PollActivityExecution calls (2 injected empties + 1 real), ' \ + "got #{poll_count}" + ensure + ws.singleton_class.send(:remove_method, :poll_activity_execution) + end + end + end + + def test_list_activities_simple_list_is_accurate + with_activity_worker([SimpleActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + env.client.execute_activity( + SimpleActivity, + 'listed', + id: activity_id, + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_eventually do + found = env.client.list_activities("ActivityId=\"#{activity_id}\"").to_a + refute_empty found, "Expected at least one activity matching ActivityId=#{activity_id}" + assert_equal activity_id, found.first.activity_id + end + end + end + + def test_list_activities_paginates + # Smoke test for the pagination loop in Implementation#list_activities. Wraps + # workflow_service.list_activity_executions to return two canned pages — the first with a + # non-empty next_page_token, the second (in response to that token) empty — and asserts the + # Enumerator transparently follows the token and yields executions from both pages. + ws = env.client.workflow_service + call_count = 0 + received_tokens = [] + ws.define_singleton_method(:list_activity_executions) do |req, **_kwargs| + call_count += 1 + received_tokens << req.next_page_token.dup + next_token = call_count == 1 ? 'page-2-token' : '' + Temporalio::Api::WorkflowService::V1::ListActivityExecutionsResponse.new( + executions: [ + Temporalio::Api::Activity::V1::ActivityExecutionListInfo.new( + activity_id: "act-page-#{call_count}", + activity_type: Temporalio::Api::Common::V1::ActivityType.new(name: 'SimpleActivity') + ) + ], + next_page_token: next_token + ) + end + + begin + results = env.client.list_activities('').to_a + assert_equal 2, call_count, "pagination loop should have made 2 RPC calls; made #{call_count}" + assert_equal %w[act-page-1 act-page-2], results.map(&:activity_id) + # First call has no page token; second call carries the token from the first response. + assert_empty received_tokens[0] + assert_equal 'page-2-token', received_tokens[1] + ensure + ws.singleton_class.send(:remove_method, :list_activity_executions) + end + end + + def test_count_activities_simple_count_is_accurate + with_activity_worker([SimpleActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + env.client.execute_activity( + SimpleActivity, + 'counted', + id: activity_id, + task_queue: task_queue, + start_to_close_timeout: 10 + ) + assert_eventually do + count = env.client.count_activities("ActivityId=\"#{activity_id}\"") + assert_kind_of Temporalio::Client::ActivityExecutionCount, count + assert_equal 1, count.count + end + end + end + + def test_get_handle_with_nil_run_id + with_activity_worker([SimpleActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + env.client.execute_activity( + SimpleActivity, + 'nil-run', + id: activity_id, + task_queue: task_queue, + start_to_close_timeout: 10 + ) + handle = env.client.activity_handle(activity_id) # run_id: nil + desc = handle.describe + assert_equal activity_id, desc.activity_id + assert_equal 'saa: nil-run', handle.result + end + end + + def test_activity_handle_describe_terminate_smoke + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + handle = env.client.start_activity( + SlowActivity, + id: activity_id, + task_queue: task_queue, + start_to_close_timeout: 30 + ) + desc = handle.describe + assert_equal activity_id, desc.activity_id + handle.terminate('smoke-test') + end + end + + def test_start_activity_id_reuse_policy_reject_duplicate_throws + with_activity_worker([SimpleActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + # First start completes successfully. + env.client.execute_activity( + SimpleActivity, 'first', + id: activity_id, task_queue: task_queue, start_to_close_timeout: 10, + id_reuse_policy: Temporalio::ActivityIDReusePolicy::REJECT_DUPLICATE + ) + # Second start with REJECT_DUPLICATE on the same ID rejects. + assert_raises(Temporalio::Error::ActivityAlreadyStartedError) do + env.client.start_activity( + SimpleActivity, 'second', + id: activity_id, task_queue: task_queue, start_to_close_timeout: 10, + id_reuse_policy: Temporalio::ActivityIDReusePolicy::REJECT_DUPLICATE + ) + end + end + end + + def test_start_activity_id_reuse_policy_reject_duplicate_overridable_by_later_request + with_activity_worker([SimpleActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + # First start with REJECT_DUPLICATE, completes. + env.client.execute_activity( + SimpleActivity, 'first', + id: activity_id, task_queue: task_queue, start_to_close_timeout: 10, + id_reuse_policy: Temporalio::ActivityIDReusePolicy::REJECT_DUPLICATE + ) + # Second start with same ID also REJECT_DUPLICATE → rejected. + assert_raises(Temporalio::Error::ActivityAlreadyStartedError) do + env.client.start_activity( + SimpleActivity, 'second', + id: activity_id, task_queue: task_queue, start_to_close_timeout: 10, + id_reuse_policy: Temporalio::ActivityIDReusePolicy::REJECT_DUPLICATE + ) + end + # Third start with ALLOW_DUPLICATE → succeeds, overriding the prior REJECT_DUPLICATE. + result = env.client.execute_activity( + SimpleActivity, 'third', + id: activity_id, task_queue: task_queue, start_to_close_timeout: 10, + id_reuse_policy: Temporalio::ActivityIDReusePolicy::ALLOW_DUPLICATE + ) + assert_equal 'saa: third', result + end + end + + def test_describe_timeouts_round_trip + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + handle = env.client.start_activity( + SlowActivity, + id: activity_id, task_queue: task_queue, + schedule_to_close_timeout: 60, + schedule_to_start_timeout: 30, + start_to_close_timeout: 45, + heartbeat_timeout: 5 + ) + desc = handle.describe + assert_in_delta 60.0, desc.schedule_to_close_timeout, 0.5 + assert_in_delta 30.0, desc.schedule_to_start_timeout, 0.5 + assert_in_delta 45.0, desc.start_to_close_timeout, 0.5 + assert_in_delta 5.0, desc.heartbeat_timeout, 0.5 + handle.terminate('cleanup') + end + end + + def test_describe_priority_round_trip + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + priority = Temporalio::Priority.new(priority_key: 3) + handle = env.client.start_activity( + SlowActivity, + id: activity_id, task_queue: task_queue, start_to_close_timeout: 30, + priority: priority + ) + desc = handle.describe + assert_equal 3, desc.priority.priority_key + handle.terminate('cleanup') + end + end + + def test_describe_static_summary_and_details_set_at_start + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + handle = env.client.start_activity( + SlowActivity, + id: activity_id, task_queue: task_queue, start_to_close_timeout: 30, + static_summary: 'my activity summary', + static_details: 'my activity details' + ) + desc = handle.describe + assert_equal 'my activity summary', desc.static_summary + assert_equal 'my activity details', desc.static_details + handle.terminate('cleanup') + end + end + + def test_describe_retry_policy_round_trip + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + retry_policy = Temporalio::RetryPolicy.new( + initial_interval: 1.5, + backoff_coefficient: 2.5, + max_interval: 30.0, + max_attempts: 7 + ) + handle = env.client.start_activity( + SlowActivity, + id: activity_id, task_queue: task_queue, start_to_close_timeout: 30, + retry_policy: retry_policy + ) + desc = handle.describe + rp = desc.retry_policy + assert_in_delta 1.5, rp.initial_interval, 0.01 + assert_in_delta 2.5, rp.backoff_coefficient, 0.01 + assert_in_delta 30.0, rp.max_interval, 0.01 + assert_equal 7, rp.max_attempts + handle.terminate('cleanup') + end + end + + def test_cancel_running_activity_transitions_to_canceled + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + handle = env.client.start_activity( + SlowActivity, + id: activity_id, task_queue: task_queue, start_to_close_timeout: 30, + heartbeat_timeout: 5 + ) + # Wait until the activity has actually started before cancelling. + assert_eventually do + desc = handle.describe + assert_operator desc.attempt, :>=, 1 + end + handle.cancel('test-cancel') + # Activity observes cancellation, raises CanceledError; server records CANCELED. + assert_eventually do + desc = handle.describe + assert_equal Temporalio::Client::ActivityExecutionStatus::CANCELED, desc.status + end + # handle.result should raise ActivityFailedError with a CanceledError cause. + err = assert_raises(Temporalio::Error::ActivityFailedError) { handle.result } + assert_instance_of Temporalio::Error::CanceledError, err.cause + end + end + + def test_describe_canceled_reason_after_cancel + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + handle = env.client.start_activity( + SlowActivity, + id: activity_id, task_queue: task_queue, start_to_close_timeout: 30 + ) + handle.cancel('user-cancel-reason') + assert_eventually do + desc = handle.describe + assert_equal 'user-cancel-reason', desc.canceled_reason + assert_equal Temporalio::Client::ActivityExecutionStatus::CANCELED, desc.status + end + err = assert_raises(Temporalio::Error::ActivityFailedError) { handle.result } + assert_instance_of Temporalio::Error::CanceledError, err.cause + end + end + + def test_describe_attempt_starts_at_one + with_activity_worker([SimpleActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + env.client.execute_activity( + SimpleActivity, 'attempt', + id: activity_id, task_queue: task_queue, start_to_close_timeout: 10 + ) + desc = env.client.activity_handle(activity_id).describe + assert_equal 1, desc.attempt + end + end + + def test_describe_attempt_after_retry + with_activity_worker([RetryOnceActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + result = env.client.execute_activity( + RetryOnceActivity, + id: activity_id, task_queue: task_queue, start_to_close_timeout: 30, + retry_policy: Temporalio::RetryPolicy.new( + initial_interval: 0.1, backoff_coefficient: 1.0, max_interval: 0.1, max_attempts: 3 + ) + ) + assert_equal 'succeeded-after-retry', result + desc = env.client.activity_handle(activity_id).describe + assert_equal 2, desc.attempt + end + end + + def test_id_conflict_policy_use_existing_returns_handle_for_running + with_activity_worker([SlowActivity]) do |task_queue| + activity_id = "act-#{SecureRandom.uuid}" + first = env.client.start_activity( + SlowActivity, + id: activity_id, task_queue: task_queue, start_to_close_timeout: 30 + ) + # Second start with same id + USE_EXISTING returns a handle to the running activity. + second = env.client.start_activity( + SlowActivity, + id: activity_id, task_queue: task_queue, start_to_close_timeout: 30, + id_conflict_policy: Temporalio::ActivityIDConflictPolicy::USE_EXISTING + ) + # Both handles target the same activity_id (same activity_run_id from the server too). + assert_equal first.id, second.id + assert_equal first.run_id, second.run_id + first.terminate('cleanup') + end + end +end diff --git a/temporalio/test/client_workflow_interceptor_chain_test.rb b/temporalio/test/client_workflow_interceptor_chain_test.rb new file mode 100644 index 00000000..e56bfa95 --- /dev/null +++ b/temporalio/test/client_workflow_interceptor_chain_test.rb @@ -0,0 +1,121 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'temporalio/client' +require 'temporalio/testing' +require 'temporalio/worker' +require 'temporalio/workflow' +require 'test' + +# Multi-interceptor ordering tests for the workflow client-call chain. +class ClientWorkflowInterceptorChainTest < Test + class SimpleWorkflow < Temporalio::Workflow::Definition + def execute + 'ok' + end + end + + class RecordingInterceptor + include Temporalio::Client::Interceptor + + def initialize(name, events_array) + @name = name + @events = events_array + end + + def intercept_client(next_interceptor) + Outbound.new(next_interceptor, @name, @events) + end + + class Outbound < Temporalio::Client::Interceptor::Outbound + def initialize(next_interceptor, name, events) + super(next_interceptor) + @name = name + @events = events + end + + def start_workflow(input) + @events << "#{@name}:start_workflow" + super + end + + def describe_workflow(input) + @events << "#{@name}:describe_workflow" + super + end + + def cancel_workflow(input) + @events << "#{@name}:cancel_workflow" + super + end + + def terminate_workflow(input) + @events << "#{@name}:terminate_workflow" + super + end + + def list_workflow_page(input) + @events << "#{@name}:list_workflow_page" + super + end + + def count_workflows(input) + @events << "#{@name}:count_workflows" + super + end + + def fetch_workflow_history_events(input) + @events << "#{@name}:fetch_workflow_history_events" + super + end + end + end + + def client_with_interceptors(interceptors) + Temporalio::Client.new(**env.client.options.with(interceptors: interceptors).to_h) + end + + def with_workflow_worker(client, &) + task_queue = "wf-tq-#{SecureRandom.uuid}" + worker = Temporalio::Worker.new( + client: client, + task_queue: task_queue, + workflows: [SimpleWorkflow] + ) + worker.run { yield task_queue } + end + + def test_two_workflow_interceptors_called_in_order_on_start + events = [] + a = RecordingInterceptor.new('A', events) + b = RecordingInterceptor.new('B', events) + # First-added is outermost, per the YARD doc on client.rb's interceptors param. + client = client_with_interceptors([a, b]) + with_workflow_worker(client) do |task_queue| + client.execute_workflow( + SimpleWorkflow, + id: "wf-#{SecureRandom.uuid}", + task_queue: task_queue + ) + end + start_events = events.select { |e| e.end_with?(':start_workflow') } + assert_equal %w[A:start_workflow B:start_workflow], start_events + end + + def test_workflow_interceptor_list_order_determines_call_order + events = [] + b = RecordingInterceptor.new('B', events) + a = RecordingInterceptor.new('A', events) + # Reverse order. B is now first-added, so B is outermost. + client = client_with_interceptors([b, a]) + with_workflow_worker(client) do |task_queue| + client.execute_workflow( + SimpleWorkflow, + id: "wf-#{SecureRandom.uuid}", + task_queue: task_queue + ) + end + start_events = events.select { |e| e.end_with?(':start_workflow') } + assert_equal %w[B:start_workflow A:start_workflow], start_events + end +end diff --git a/temporalio/test/sig/activity_info_standalone_test.rbs b/temporalio/test/sig/activity_info_standalone_test.rbs new file mode 100644 index 00000000..6f887545 --- /dev/null +++ b/temporalio/test/sig/activity_info_standalone_test.rbs @@ -0,0 +1,4 @@ +class ActivityInfoStandaloneTest < Test + def capture_next: -> Hash[Symbol, untyped] + def drain_captures: -> void +end diff --git a/temporalio/test/sig/client_activity_async_completion_test.rbs b/temporalio/test/sig/client_activity_async_completion_test.rbs new file mode 100644 index 00000000..4684ea82 --- /dev/null +++ b/temporalio/test/sig/client_activity_async_completion_test.rbs @@ -0,0 +1,3 @@ +class ClientActivityAsyncCompletionTest < Test + def with_async_worker: () { (String) -> untyped } -> untyped +end diff --git a/temporalio/test/sig/client_activity_hints_test.rbs b/temporalio/test/sig/client_activity_hints_test.rbs new file mode 100644 index 00000000..a267f389 --- /dev/null +++ b/temporalio/test/sig/client_activity_hints_test.rbs @@ -0,0 +1,5 @@ +class ClientActivityHintsTest < Test + @hint_converter: untyped + + def build_tracking_client: -> Temporalio::Client +end diff --git a/temporalio/test/sig/client_activity_interceptor_chain_test.rbs b/temporalio/test/sig/client_activity_interceptor_chain_test.rbs new file mode 100644 index 00000000..ec7bee6f --- /dev/null +++ b/temporalio/test/sig/client_activity_interceptor_chain_test.rbs @@ -0,0 +1,8 @@ +class ClientActivityInterceptorChainTest < Test + def client_with_interceptors: (Array[Temporalio::Client::Interceptor]) -> Temporalio::Client + + def with_activity_worker: ( + Temporalio::Client client, + Array[singleton(Temporalio::Activity::Definition)] activities + ) { (String) -> untyped } -> untyped +end diff --git a/temporalio/test/sig/client_activity_test.rbs b/temporalio/test/sig/client_activity_test.rbs new file mode 100644 index 00000000..b24c7004 --- /dev/null +++ b/temporalio/test/sig/client_activity_test.rbs @@ -0,0 +1,3 @@ +class ClientActivityTest < Test + def with_activity_worker: (Array[singleton(Temporalio::Activity::Definition)] activities) { (String) -> untyped } -> untyped +end diff --git a/temporalio/test/sig/client_workflow_interceptor_chain_test.rbs b/temporalio/test/sig/client_workflow_interceptor_chain_test.rbs new file mode 100644 index 00000000..b21d2773 --- /dev/null +++ b/temporalio/test/sig/client_workflow_interceptor_chain_test.rbs @@ -0,0 +1,7 @@ +class ClientWorkflowInterceptorChainTest < Test + def client_with_interceptors: (Array[Temporalio::Client::Interceptor]) -> Temporalio::Client + + def with_workflow_worker: ( + Temporalio::Client client + ) { (String) -> untyped } -> untyped +end diff --git a/temporalio/test/worker_activity_test.rb b/temporalio/test/worker_activity_test.rb index e0ccaaa5..782a293f 100644 --- a/temporalio/test/worker_activity_test.rb +++ b/temporalio/test/worker_activity_test.rb @@ -584,7 +584,7 @@ def execute info = Temporalio::Activity::Context.current.info @task_token.push(info.task_token) @id_ref.push(Temporalio::Client::ActivityIDReference.new( - workflow_id: info.workflow_id, + workflow_id: info.workflow_id || raise, run_id: info.workflow_run_id, activity_id: info.activity_id )) @@ -1040,7 +1040,7 @@ def test_context_instance class ClientAccessActivity < Temporalio::Activity::Definition def execute desc = Temporalio::Activity::Context.current.client.workflow_handle( - Temporalio::Activity::Context.current.info.workflow_id + Temporalio::Activity::Context.current.info.workflow_id || raise ).describe desc.raw_description.pending_activities.first.activity_type.name end diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index 7860a14d..c279be96 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -2703,7 +2703,7 @@ def test_last_failure class LeftoverWaitActivity < Temporalio::Activity::Definition def execute ctx = Temporalio::Activity::Context.current - ctx.client.workflow_handle(ctx.info.workflow_id).signal(LeftoverWaitWorkflow.some_signal) + ctx.client.workflow_handle(ctx.info.workflow_id || raise).signal(LeftoverWaitWorkflow.some_signal) end end