diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityHandle.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityHandle.java index b6f97febe..71788efcc 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ActivityHandle.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityHandle.java @@ -30,7 +30,8 @@ public interface ActivityHandle extends UntypedActivityHandle { /** * Blocks until the standalone activity completes and returns the typed result, or throws if the - * client-side timeout expires first. + * + *

client-side timeout expires first. * * @param timeout maximum time to wait * @param unit unit of {@code timeout} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusClient.java b/temporal-sdk/src/main/java/io/temporal/client/NexusClient.java new file mode 100644 index 000000000..4de519bc9 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusClient.java @@ -0,0 +1,151 @@ +package io.temporal.client; + +import io.temporal.common.Experimental; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.lang.reflect.Type; +import java.util.stream.Stream; +import javax.annotation.Nullable; + +/** + * Client for managing standalone Nexus operation executions. Obtain an instance via {@link + * #newInstance(WorkflowServiceStubs)} or {@link #newInstance(WorkflowServiceStubs, + * NexusClientOptions)}. Do not create this object per request; share it for the lifetime of the + * process. + * + *

Standalone Nexus operations run independently of any workflow — they are scheduled, monitored, + * and managed directly through this client (and the service-bound clients it produces) rather than + * from within a workflow execution. + * + *

To start operations, build a service-bound client and call {@code start}/{@code execute}: + * + *

{@code
+ * NexusClient client = NexusClient.newInstance(stubs, options);
+ *
+ * // Typed: bind to an @ServiceInterface and invoke a method reference.
+ * NexusServiceClient svc =
+ *     NexusServiceClient.newInstance(MyService.class, "my-endpoint", stubs, options);
+ * String result = svc.execute(MyService::greet, "world");
+ *
+ * // Untyped: dispatch by operation name string.
+ * UntypedNexusServiceClient untyped =
+ *     client.newUntypedNexusServiceClient("my-endpoint", "MyService");
+ * UntypedNexusOperationHandle handle = untyped.start("greet", null, "world");
+ * }
+ * + *

To act on an existing operation (describe, cancel, terminate, get result), obtain a handle via + * {@link #getHandle}: + * + *

{@code
+ * NexusOperationHandle handle = client.getHandle(operationId, runId, String.class);
+ * String result = handle.getResult();
+ * handle.cancel("user requested");
+ * }
+ * + *

For visibility queries across all operations in the namespace, see {@link + * #listNexusOperationExecutions} and {@link #countNexusOperationExecutions}. + * + * @see NexusServiceClient + * @see UntypedNexusServiceClient + * @see NexusOperationHandle + */ +@Experimental +public interface NexusClient { + + /** + * Creates a client with default {@link NexusClientOptions}. + * + * @param service gRPC stubs connected to a Temporal Service endpoint + */ + static NexusClient newInstance(WorkflowServiceStubs service) { + return NexusClientImpl.newInstance(service, NexusClientOptions.getDefaultInstance()); + } + + /** + * Creates a client with the supplied options. + * + * @param service gRPC stubs connected to a Temporal Service endpoint + * @param options namespace, data converter, interceptors, and defaults applied to operations + * started through this client + */ + static NexusClient newInstance(WorkflowServiceStubs service, NexusClientOptions options) { + return NexusClientImpl.newInstance(service, options); + } + + /** Returns the underlying gRPC stubs this client routes RPCs through. */ + WorkflowServiceStubs getWorkflowServiceStubs(); + + /** + * Returns an untyped handle to an existing operation execution, targeting the latest run. To bind + * a result type, wrap the handle with {@link NexusOperationHandle#fromUntyped}. + * + * @param operationId the user-assigned operation ID + * @return an untyped handle + */ + UntypedNexusOperationHandle getHandle(String operationId); + + /** + * Returns an untyped handle to an existing operation execution, optionally pinned to a specific + * run. + * + * @param operationId the user-assigned operation ID + * @param runId the server-assigned run ID, or {@code null} to target the latest run + * @return an untyped handle + */ + UntypedNexusOperationHandle getHandle(String operationId, @Nullable String runId); + + /** + * Returns a typed handle to an existing operation execution, bound to {@code resultClass}. + * + * @param operationId the user-assigned operation ID + * @param runId the server-assigned run ID, or {@code null} to target the latest run + * @param resultClass expected result type + * @param result type + */ + NexusOperationHandle getHandle( + String operationId, @Nullable String runId, Class resultClass); + + /** + * Returns a typed handle to an existing operation execution, bound to {@code resultClass}/{@code + * resultType}. Use the {@code resultType} variant when the result is a generic type whose + * parameters cannot be captured by {@link Class} alone (e.g. {@code List}). + * + * @param operationId the user-assigned operation ID + * @param runId the server-assigned run ID, or {@code null} to target the latest run + * @param resultClass expected result class + * @param resultType generic type for deserialization; may be {@code null} + * @param result type + */ + NexusOperationHandle getHandle( + String operationId, @Nullable String runId, Class resultClass, @Nullable Type resultType); + + /** + * Builds an untyped service-bound client targeting the given endpoint and service. Use this to + * dispatch operations by name string when no service interface is available. + * + * @param endpoint Nexus endpoint name registered on the Temporal Service + * @param serviceName Nexus service name on that endpoint + */ + UntypedNexusServiceClient newUntypedNexusServiceClient(String endpoint, String serviceName); + + /** + * Returns a stream of standalone Nexus operation executions matching the given visibility query. + * The stream paginates lazily over server-side results — pages are fetched on demand as the + * stream is consumed. + * + * @param query Temporal visibility query string, or {@code null} to return all executions in the + * client namespace + * @return a lazy stream of matching executions + */ + Stream listNexusOperationExecutions(@Nullable String query); + + /** + * Returns the count of standalone Nexus operation executions matching the given visibility query, + * optionally with aggregation groups. + * + * @param query Temporal visibility query string, or {@code null} to count all executions in the + * client namespace + * @return execution count, optionally with aggregation groups when the query uses {@code GROUP + * BY} + */ + NexusOperationExecutionCount countNexusOperationExecutions(@Nullable String query); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusClientImpl.java b/temporal-sdk/src/main/java/io/temporal/client/NexusClientImpl.java new file mode 100644 index 000000000..4976a15f0 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusClientImpl.java @@ -0,0 +1,211 @@ +package io.temporal.client; + +import static io.temporal.internal.WorkflowThreadMarker.enforceNonWorkflowThread; + +import com.google.protobuf.ByteString; +import com.uber.m3.tally.Scope; +import io.temporal.common.Experimental; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput; +import io.temporal.common.interceptors.NexusClientInterceptor; +import io.temporal.internal.WorkflowThreadMarker; +import io.temporal.internal.client.NamespaceInjectWorkflowServiceStubs; +import io.temporal.internal.client.NexusOperationHandleImpl; +import io.temporal.internal.client.RootNexusClientInvoker; +import io.temporal.internal.client.external.GenericWorkflowClient; +import io.temporal.internal.client.external.GenericWorkflowClientImpl; +import io.temporal.serviceclient.MetricsTag; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Experimental +public class NexusClientImpl implements NexusClient { + + private static final Logger log = LoggerFactory.getLogger(NexusClientImpl.class); + + private final WorkflowServiceStubs workflowServiceStubs; + private final NexusClientOptions options; + private final GenericWorkflowClient genericClient; + private final Scope metricsScope; + private final NexusClientCallsInterceptor nexusClientCallsInvoker; + private final List interceptors; + + public static NexusClient newInstance(WorkflowServiceStubs service, NexusClientOptions options) { + enforceNonWorkflowThread(); + return WorkflowThreadMarker.protectFromWorkflowThread( + new NexusClientImpl(service, options), NexusClient.class); + } + + NexusClientImpl(WorkflowServiceStubs workflowServiceStubs, NexusClientOptions options) { + workflowServiceStubs = + new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace()); + this.workflowServiceStubs = workflowServiceStubs; + this.options = options; + this.metricsScope = + workflowServiceStubs + .getOptions() + .getMetricsScope() + .tagged(MetricsTag.defaultTags(options.getNamespace())); + this.genericClient = new GenericWorkflowClientImpl(workflowServiceStubs, metricsScope); + this.interceptors = options.getInterceptors(); + this.nexusClientCallsInvoker = initializeClientInvoker(); + if (log.isDebugEnabled()) { + log.debug( + "NexusClient initialized: namespace={}, interceptors={}", + options.getNamespace(), + interceptors.size()); + } + } + + private NexusClientCallsInterceptor initializeClientInvoker() { + NexusClientCallsInterceptor invoker = new RootNexusClientInvoker(genericClient, options); + for (NexusClientInterceptor clientInterceptor : interceptors) { + NexusClientCallsInterceptor wrapped = clientInterceptor.nexusClientCallsInterceptor(invoker); + if (wrapped == null) { + throw new IllegalStateException( + "NexusClientInterceptor " + + clientInterceptor.getClass().getName() + + " returned null from nexusClientCallsInterceptor; expected a non-null" + + " NexusClientCallsInterceptor wrapping the supplied next link"); + } + invoker = wrapped; + } + return invoker; + } + + @Override + public WorkflowServiceStubs getWorkflowServiceStubs() { + return workflowServiceStubs; + } + + @Override + public UntypedNexusOperationHandle getHandle(String operationId) { + return getHandle(operationId, null); + } + + @Override + public UntypedNexusOperationHandle getHandle(String operationId, @Nullable String runId) { + return new NexusOperationHandleImpl( + operationId, runId, nexusClientCallsInvoker, options.getDataConverter()); + } + + @Override + public NexusOperationHandle getHandle( + String operationId, @Nullable String runId, Class resultClass) { + return getHandle(operationId, runId, resultClass, null); + } + + @Override + public NexusOperationHandle getHandle( + String operationId, + @Nullable String runId, + Class resultClass, + @Nullable java.lang.reflect.Type resultType) { + return NexusOperationHandle.fromUntyped(getHandle(operationId, runId), resultClass, resultType); + } + + @Override + public UntypedNexusServiceClient newUntypedNexusServiceClient( + String endpoint, String serviceName) { + return new UntypedNexusServiceClientImpl( + nexusClientCallsInvoker, endpoint, serviceName, options); + } + + /** + * Returns the head of the interceptor chain. Package-private so service-client builders can route + * start RPCs through the chain without exposing it on the public {@link NexusClient} interface. + */ + NexusClientCallsInterceptor getNexusClientCallsInvoker() { + return nexusClientCallsInvoker; + } + + private static final int DEFAULT_LIST_PAGE_SIZE = 1000; + + @Override + public Stream listNexusOperationExecutions( + @Nullable String query) { + Iterator iter = + new ListPageIterator(nexusClientCallsInvoker, query, DEFAULT_LIST_PAGE_SIZE); + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED | Spliterator.NONNULL), + false); + } + + @Override + public NexusOperationExecutionCount countNexusOperationExecutions(@Nullable String query) { + CountNexusOperationExecutionsOutput out = + nexusClientCallsInvoker.countNexusOperationExecutions( + new CountNexusOperationExecutionsInput(query)); + List publicGroups = + out.getGroups().stream() + .map( + g -> + new NexusOperationExecutionCount.AggregationGroup( + g.getCount(), g.getGroupValues())) + .collect(Collectors.toList()); + return new NexusOperationExecutionCount(out.getCount(), publicGroups); + } + + /** Lazily fetches pages from the interceptor and flattens them into a single iteration. */ + private static final class ListPageIterator implements Iterator { + private final NexusClientCallsInterceptor invoker; + private final @Nullable String query; + private final int pageSize; + private Iterator current = + java.util.Collections.emptyIterator(); + private @Nullable ByteString nextPageToken = null; + private boolean exhausted = false; + + ListPageIterator(NexusClientCallsInterceptor invoker, @Nullable String query, int pageSize) { + this.invoker = invoker; + this.query = query; + this.pageSize = pageSize; + } + + @Override + public boolean hasNext() { + while (!current.hasNext() && !exhausted) { + fetchNextPage(); + } + return current.hasNext(); + } + + @Override + public NexusOperationExecutionMetadata next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return current.next(); + } + + private void fetchNextPage() { + ListNexusOperationExecutionsOutput page = + invoker.listNexusOperationExecutions( + new ListNexusOperationExecutionsInput(query, pageSize, nextPageToken)); + current = + page.getOperations().stream() + .map(NexusOperationExecutionMetadata::fromListInfo) + .iterator(); + ByteString token = page.getNextPageToken(); + if (token == null || token.isEmpty()) { + exhausted = true; + nextPageToken = null; + } else { + nextPageToken = token; + } + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusClientOptions.java b/temporal-sdk/src/main/java/io/temporal/client/NexusClientOptions.java new file mode 100644 index 000000000..1e1e0e983 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusClientOptions.java @@ -0,0 +1,155 @@ +package io.temporal.client; + +import io.temporal.common.Experimental; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.GlobalDataConverter; +import io.temporal.common.interceptors.NexusClientInterceptor; +import java.lang.management.ManagementFactory; +import java.util.Collections; +import java.util.List; + +/** + * Options that configure a {@link NexusClient} (and the service-bound clients it produces). + * + *

Carries only client-wide settings (namespace, data converter, interceptors). Per-call settings + * — operation ID, timeouts, search attributes, summary, id-reuse/conflict policies — belong on + * {@link StartNexusOperationOptions}. + * + *

Obtain a builder via {@link #newBuilder()} or copy an existing instance via {@link + * #newBuilder(NexusClientOptions)}. The default instance ({@link #getDefaultInstance()}) is + * suitable when only the namespace is required and the {@link GlobalDataConverter} is appropriate. + * + *

{@code
+ * NexusClientOptions options =
+ *     NexusClientOptions.newBuilder()
+ *         .setNamespace("default")
+ *         .setDataConverter(myDataConverter)
+ *         .build();
+ * }
+ */ +@Experimental +public class NexusClientOptions { + + private final String namespace; + private final List interceptors; + private final DataConverter dataConverter; + private final String identity; + + private NexusClientOptions( + String namespace, + List interceptors, + DataConverter dataConverter, + String identity) { + this.namespace = namespace; + this.interceptors = interceptors; + this.dataConverter = dataConverter; + this.identity = identity; + } + + /** Get the namespace this client will operate on. */ + public String getNamespace() { + return namespace; + } + + /** Get the interceptors of this client. */ + public List getInterceptors() { + return interceptors; + } + + /** Get the data converter used to serialize Nexus operation inputs and deserialize results. */ + public DataConverter getDataConverter() { + return dataConverter; + } + + /** + * Human-readable identity of this client. Stamped onto outgoing write requests (start, cancel, + * terminate) so server-side history and audit trails can attribute the action to a caller. + */ + public String getIdentity() { + return identity; + } + + /** Returns a fresh builder. */ + public static NexusClientOptions.Builder newBuilder() { + return new NexusClientOptions.Builder(); + } + + /** Returns a builder seeded with the values from {@code options}. */ + public static NexusClientOptions.Builder newBuilder(NexusClientOptions options) { + return new NexusClientOptions.Builder(options); + } + + private static final NexusClientOptions DEFAULT_INSTANCE; + + /** + * Returns an options instance with all defaults. Note this leaves namespace unset; callers + * usually need to specify a namespace. + */ + public static NexusClientOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + static { + DEFAULT_INSTANCE = NexusClientOptions.newBuilder().build(); + } + + /** Builder for {@link NexusClientOptions}. */ + public static class Builder { + private String namespace; + private List interceptors = Collections.emptyList(); + private DataConverter dataConverter = GlobalDataConverter.get(); + private String identity; + + private Builder() {} + + private Builder(NexusClientOptions options) { + if (options == null) { + return; + } + namespace = options.namespace; + interceptors = options.interceptors; + dataConverter = options.dataConverter; + identity = options.identity; + } + + /** Set the namespace this client will operate on. */ + public NexusClientOptions.Builder setNamespace(String namespace) { + this.namespace = namespace; + return this; + } + + /** Set the interceptors for this client, but don't allow null lists to happen. */ + public NexusClientOptions.Builder setInterceptors(List interceptors) { + if (interceptors == null) { + this.interceptors = Collections.emptyList(); + } else { + this.interceptors = interceptors; + } + return this; + } + + /** + * Set the data converter used to serialize Nexus operation inputs and deserialize results. + * Defaults to {@link GlobalDataConverter#get()}. + */ + public NexusClientOptions.Builder setDataConverter(DataConverter dataConverter) { + this.dataConverter = dataConverter; + return this; + } + + /** + * Override the human-readable identity stamped on outgoing write requests. Defaults to the JVM + * runtime name (typically {@code pid@host}). + */ + public NexusClientOptions.Builder setIdentity(String identity) { + this.identity = identity; + return this; + } + + public NexusClientOptions build() { + String resolvedIdentity = + identity == null ? ManagementFactory.getRuntimeMXBean().getName() : identity; + return new NexusClientOptions(namespace, interceptors, dataConverter, resolvedIdentity); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusOperationCancellationInfo.java b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationCancellationInfo.java new file mode 100644 index 000000000..54b99a5b7 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationCancellationInfo.java @@ -0,0 +1,96 @@ +package io.temporal.client; + +import io.temporal.api.enums.v1.NexusOperationCancellationState; +import io.temporal.api.nexus.v1.NexusOperationExecutionCancellationInfo; +import io.temporal.common.Experimental; +import io.temporal.common.converter.DataConverter; +import io.temporal.internal.common.ProtobufTimeUtils; +import java.time.Instant; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Information about a cancellation request issued against a standalone Nexus operation execution. + * Returned by {@link NexusOperationExecutionDescription#getCancellationInfo()}. + */ +@Experimental +public final class NexusOperationCancellationInfo { + + private final NexusOperationExecutionCancellationInfo info; + private final DataConverter dataConverter; + + NexusOperationCancellationInfo( + NexusOperationExecutionCancellationInfo info, DataConverter dataConverter) { + this.info = info; + this.dataConverter = dataConverter; + } + + /** The raw protobuf info returned by the server. */ + @Nonnull + public NexusOperationExecutionCancellationInfo getRawInfo() { + return info; + } + + /** Time when cancellation was originally requested. */ + @Nullable + public Instant getRequestedTime() { + return info.hasRequestedTime() + ? ProtobufTimeUtils.toJavaInstant(info.getRequestedTime()) + : null; + } + + /** Current state of cancellation-request delivery to the operation handler. */ + @Nonnull + public NexusOperationCancellationState getState() { + return info.getState(); + } + + /** + * Current attempt number for delivering the cancel request to the handler. Represents a minimum + * bound — the value is incremented after the attempt completes. + */ + public int getAttempt() { + return info.getAttempt(); + } + + /** Time the last cancel-delivery attempt completed. */ + @Nullable + public Instant getLastAttemptCompleteTime() { + return info.hasLastAttemptCompleteTime() + ? ProtobufTimeUtils.toJavaInstant(info.getLastAttemptCompleteTime()) + : null; + } + + /** Failure from the last cancel-delivery attempt. {@code null} if no failure has occurred yet. */ + @Nullable + public Exception getLastAttemptFailure() { + return info.hasLastAttemptFailure() + ? dataConverter.failureToException(info.getLastAttemptFailure()) + : null; + } + + /** Time when the next cancel-delivery attempt is scheduled. */ + @Nullable + public Instant getNextAttemptScheduleTime() { + return info.hasNextAttemptScheduleTime() + ? ProtobufTimeUtils.toJavaInstant(info.getNextAttemptScheduleTime()) + : null; + } + + /** + * Additional context for why cancel delivery is blocked. Set only when {@link #getState()} + * indicates a blocked state. + */ + @Nullable + public String getBlockedReason() { + String r = info.getBlockedReason(); + return r.isEmpty() ? null : r; + } + + /** The human-readable reason supplied with the original cancel request, if any. */ + @Nullable + public String getReason() { + String r = info.getReason(); + return r.isEmpty() ? null : r; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusOperationExecutionCount.java b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationExecutionCount.java new file mode 100644 index 000000000..271671cf5 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationExecutionCount.java @@ -0,0 +1,94 @@ +package io.temporal.client; + +import io.temporal.api.common.v1.Payload; +import io.temporal.common.Experimental; +import io.temporal.internal.common.SearchAttributesUtil; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; + +/** Result of counting standalone Nexus operation executions. */ +@Experimental +public class NexusOperationExecutionCount { + + /** An individual aggregation group. */ + @Experimental + public static class AggregationGroup { + private final List> groupValues; + private final long count; + + /** Construct from raw payload group values; values are decoded eagerly. */ + public AggregationGroup(long count, List groupValues) { + this.groupValues = + groupValues.stream().map(SearchAttributesUtil::decode).collect(Collectors.toList()); + this.count = count; + } + + /** Values of the group, decoded from search attribute payloads. */ + public List> getGroupValues() { + return groupValues; + } + + /** Count of operations in this group. */ + public long getCount() { + return count; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AggregationGroup that = (AggregationGroup) o; + return count == that.count && Objects.equals(groupValues, that.groupValues); + } + + @Override + public int hashCode() { + return Objects.hash(groupValues, count); + } + + @Override + public String toString() { + return "AggregationGroup{groupValues=" + groupValues + ", count=" + count + '}'; + } + } + + private final long count; + private final List groups; + + public NexusOperationExecutionCount(long count, List groups) { + this.count = count; + this.groups = Collections.unmodifiableList(groups); + } + + /** Total number of operation executions matching the query. */ + public long getCount() { + return count; + } + + /** Aggregation groups returned by the service. Empty if no grouping was requested. */ + @Nonnull + public List getGroups() { + return groups; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NexusOperationExecutionCount that = (NexusOperationExecutionCount) o; + return count == that.count && Objects.equals(groups, that.groups); + } + + @Override + public int hashCode() { + return Objects.hash(count, groups); + } + + @Override + public String toString() { + return "NexusOperationExecutionCount{count=" + count + ", groups=" + groups + '}'; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusOperationExecutionDescription.java b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationExecutionDescription.java new file mode 100644 index 000000000..1c4670126 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationExecutionDescription.java @@ -0,0 +1,261 @@ +package io.temporal.client; + +import io.temporal.api.enums.v1.PendingNexusOperationState; +import io.temporal.api.nexus.v1.NexusOperationExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeNexusOperationExecutionResponse; +import io.temporal.common.Experimental; +import io.temporal.common.converter.DataConverter; +import io.temporal.internal.common.ProtobufTimeUtils; +import io.temporal.internal.common.SearchAttributesUtil; +import java.lang.reflect.Type; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Detailed information about a standalone Nexus operation execution, returned by {@link + * UntypedNexusOperationHandle#describe()}. + */ +@Experimental +public final class NexusOperationExecutionDescription extends NexusOperationExecutionMetadata { + + private final DescribeNexusOperationExecutionResponse response; + private final NexusOperationExecutionInfo info; + private final DataConverter dataConverter; + + public NexusOperationExecutionDescription( + DescribeNexusOperationExecutionResponse response, + DataConverter dataConverter, + String namespace) { + super( + /* rawListInfo= */ null, + response.getInfo().getOperationId(), + nullIfEmpty(response.getInfo().getRunId()), + response.getInfo().getEndpoint(), + response.getInfo().getService(), + response.getInfo().getOperation(), + response.getInfo().hasScheduleTime() + ? ProtobufTimeUtils.toJavaInstant(response.getInfo().getScheduleTime()) + : Instant.EPOCH, + response.getInfo().hasCloseTime() + ? ProtobufTimeUtils.toJavaInstant(response.getInfo().getCloseTime()) + : null, + response.getInfo().getStatus(), + SearchAttributesUtil.decodeTyped(response.getInfo().getSearchAttributes()), + response.getInfo().getStateTransitionCount(), + response.getInfo().hasExecutionDuration() + ? ProtobufTimeUtils.toJavaDuration(response.getInfo().getExecutionDuration()) + : null); + this.response = response; + this.info = response.getInfo(); + this.dataConverter = dataConverter; + } + + private static @Nullable String nullIfEmpty(String s) { + return s == null || s.isEmpty() ? null : s; + } + + /** Underlying proto response. Exposed while the Nexus SDK surface is still experimental. */ + @Nonnull + public DescribeNexusOperationExecutionResponse getRawResponse() { + return response; + } + + /** The raw protobuf info returned by the server for this operation execution. */ + @Nonnull + public NexusOperationExecutionInfo getRawInfo() { + return info; + } + + /** Current attempt number for the start request (starts at 1). */ + public int getAttempt() { + return info.getAttempt(); + } + + /** + * Detailed run state (e.g. scheduled, started, backing off). Only meaningful when {@link + * #getStatus()} is {@code NEXUS_OPERATION_EXECUTION_STATUS_RUNNING}. + */ + @Nonnull + public PendingNexusOperationState getRunState() { + return info.getState(); + } + + /** Total time the caller is willing to wait for the operation to complete, including retries. */ + @Nullable + public Duration getScheduleToCloseTimeout() { + return info.hasScheduleToCloseTimeout() + ? ProtobufTimeUtils.toJavaDuration(info.getScheduleToCloseTimeout()) + : null; + } + + /** Maximum time the start request may wait before being delivered to the handler. */ + @Nullable + public Duration getScheduleToStartTimeout() { + return info.hasScheduleToStartTimeout() + ? ProtobufTimeUtils.toJavaDuration(info.getScheduleToStartTimeout()) + : null; + } + + /** Maximum time for a single start-request attempt. */ + @Nullable + public Duration getStartToCloseTimeout() { + return info.hasStartToCloseTimeout() + ? ProtobufTimeUtils.toJavaDuration(info.getStartToCloseTimeout()) + : null; + } + + /** Scheduled time plus schedule-to-close timeout. */ + @Nullable + public Instant getExpirationTime() { + return info.hasExpirationTime() + ? ProtobufTimeUtils.toJavaInstant(info.getExpirationTime()) + : null; + } + + /** Time the last start-request attempt completed (succeeded or failed). */ + @Nullable + public Instant getLastAttemptCompleteTime() { + return info.hasLastAttemptCompleteTime() + ? ProtobufTimeUtils.toJavaInstant(info.getLastAttemptCompleteTime()) + : null; + } + + /** Failure from the last start-request attempt. {@code null} if no failure has occurred. */ + @Nullable + public Exception getLastAttemptFailure() { + return info.hasLastAttemptFailure() + ? dataConverter.failureToException(info.getLastAttemptFailure()) + : null; + } + + /** Time when the next start-request attempt will be scheduled. */ + @Nullable + public Instant getNextAttemptScheduleTime() { + return info.hasNextAttemptScheduleTime() + ? ProtobufTimeUtils.toJavaInstant(info.getNextAttemptScheduleTime()) + : null; + } + + /** Cancellation details if cancellation was requested; {@code null} otherwise. */ + @Nullable + public NexusOperationCancellationInfo getCancellationInfo() { + return info.hasCancellationInfo() + ? new NexusOperationCancellationInfo(info.getCancellationInfo(), dataConverter) + : null; + } + + /** + * Additional context for why the operation is blocked. Set only when {@link #getRunState()} is + * {@code BLOCKED}. + */ + @Nullable + public String getBlockedReason() { + String r = info.getBlockedReason(); + return r.isEmpty() ? null : r; + } + + /** + * Server-generated request ID used as an idempotency token when submitting the start request to + * the operation handler. + */ + @Nullable + public String getHandlerRequestId() { + String r = info.getRequestId(); + return r.isEmpty() ? null : r; + } + + /** Operation token returned by the handler; set only for asynchronous operations after start. */ + @Nullable + public String getOperationToken() { + String t = info.getOperationToken(); + return t.isEmpty() ? null : t; + } + + /** Identity of the client that started this operation. */ + @Nullable + public String getIdentity() { + String i = info.getIdentity(); + return i.isEmpty() ? null : i; + } + + /** + * Whether the operation input payload is present on this description. Set only when {@link + * UntypedNexusOperationHandle#describe()} was called with {@code includeInput=true}. + */ + public boolean hasInput() { + return response.hasInput(); + } + + /** + * Deserializes the operation input into the given type. Returns {@link Optional#empty()} if no + * input is present (either the operation was started without one or {@code includeInput} was + * false on the describe call). + * + * @param valueType the class to deserialize the input into + */ + public Optional getInput(Class valueType) { + return getInput(valueType, valueType); + } + + /** + * Deserializes the operation input into the given generic type. Returns {@link Optional#empty()} + * if no input is present. + * + * @param valueType the class to deserialize the input into + * @param genericType the generic type for deserialization; may equal {@code valueType} + */ + public Optional getInput(Class valueType, Type genericType) { + if (!response.hasInput()) { + return Optional.empty(); + } + return Optional.ofNullable( + dataConverter.fromPayload(response.getInput(), valueType, genericType)); + } + + /** + * Whether the operation's success result is present. Set only when {@link + * UntypedNexusOperationHandle#describe()} was called with {@code includeOutcome=true} and the + * operation completed successfully. + */ + public boolean hasResult() { + return response.hasResult(); + } + + /** + * Deserializes the operation's success result. Returns {@link Optional#empty()} if no result is + * present (operation still running, completed with a failure, or {@code includeOutcome} was + * false). + * + * @param valueType the class to deserialize the result into + */ + public Optional getResult(Class valueType) { + return getResult(valueType, valueType); + } + + /** + * Deserializes the operation's success result into the given generic type. Returns {@link + * Optional#empty()} if no result is present. + * + * @param valueType the class to deserialize the result into + * @param genericType the generic type for deserialization; may equal {@code valueType} + */ + public Optional getResult(Class valueType, Type genericType) { + if (!response.hasResult()) { + return Optional.empty(); + } + return Optional.ofNullable( + dataConverter.fromPayload(response.getResult(), valueType, genericType)); + } + + /** + * Operation failure as a thrown-style exception. Returns {@code null} if the operation did not + * complete with a failure or if {@code includeOutcome} was false on the describe call. + */ + @Nullable + public Exception getFailure() { + return response.hasFailure() ? dataConverter.failureToException(response.getFailure()) : null; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusOperationExecutionMetadata.java b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationExecutionMetadata.java new file mode 100644 index 000000000..075e2e42f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationExecutionMetadata.java @@ -0,0 +1,214 @@ +package io.temporal.client; + +import io.temporal.api.enums.v1.NexusOperationExecutionStatus; +import io.temporal.api.nexus.v1.NexusOperationExecutionListInfo; +import io.temporal.common.Experimental; +import io.temporal.common.SearchAttributes; +import io.temporal.internal.common.ProtobufTimeUtils; +import io.temporal.internal.common.SearchAttributesUtil; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Information about a standalone Nexus operation execution returned by {@link + * NexusClient#listNexusOperationExecutions}. + */ +@Experimental +public class NexusOperationExecutionMetadata { + + private final @Nullable NexusOperationExecutionListInfo rawListInfo; + private final String operationId; + private final @Nullable String runId; + private final String endpoint; + private final String service; + private final String operation; + private final Instant scheduledTime; + private final @Nullable Instant closeTime; + private final NexusOperationExecutionStatus status; + private final SearchAttributes searchAttributes; + private final long stateTransitionCount; + private final @Nullable Duration executionDuration; + + NexusOperationExecutionMetadata( + @Nullable NexusOperationExecutionListInfo rawListInfo, + String operationId, + @Nullable String runId, + String endpoint, + String service, + String operation, + Instant scheduledTime, + @Nullable Instant closeTime, + NexusOperationExecutionStatus status, + SearchAttributes searchAttributes, + long stateTransitionCount, + @Nullable Duration executionDuration) { + this.rawListInfo = rawListInfo; + this.operationId = operationId; + this.runId = runId; + this.endpoint = endpoint; + this.service = service; + this.operation = operation; + this.scheduledTime = scheduledTime; + this.closeTime = closeTime; + this.status = status; + this.searchAttributes = searchAttributes; + this.stateTransitionCount = stateTransitionCount; + this.executionDuration = executionDuration; + } + + public static NexusOperationExecutionMetadata fromListInfo(NexusOperationExecutionListInfo info) { + String runId = info.getRunId(); + return new NexusOperationExecutionMetadata( + info, + info.getOperationId(), + runId.isEmpty() ? null : runId, + info.getEndpoint(), + info.getService(), + info.getOperation(), + ProtobufTimeUtils.toJavaInstant(info.getScheduleTime()), + info.hasCloseTime() ? ProtobufTimeUtils.toJavaInstant(info.getCloseTime()) : null, + info.getStatus(), + SearchAttributesUtil.decodeTyped(info.getSearchAttributes()), + info.getStateTransitionCount(), + info.hasExecutionDuration() + ? ProtobufTimeUtils.toJavaDuration(info.getExecutionDuration()) + : null); + } + + /** + * The raw protobuf list info from the server. Only present when this instance was created via + * {@link #fromListInfo}. + */ + @Nullable + public NexusOperationExecutionListInfo getRawListInfo() { + return rawListInfo; + } + + /** The user-assigned identifier for this operation. */ + @Nonnull + public String getOperationId() { + return operationId; + } + + /** The server-assigned run ID for this operation execution. May be {@code null}. */ + @Nullable + public String getRunId() { + return runId; + } + + /** The Nexus endpoint name this operation targets. */ + @Nonnull + public String getEndpoint() { + return endpoint; + } + + /** The Nexus service name on the endpoint. */ + @Nonnull + public String getService() { + return service; + } + + /** The Nexus operation name within the service. */ + @Nonnull + public String getOperation() { + return operation; + } + + /** Time when the operation was originally scheduled via a {@code StartNexusOperation} request. */ + @Nonnull + public Instant getScheduledTime() { + return scheduledTime; + } + + /** Time the operation transitioned to a terminal status. {@code null} while still running. */ + @Nullable + public Instant getCloseTime() { + return closeTime; + } + + /** General status of the operation execution. */ + @Nonnull + public NexusOperationExecutionStatus getStatus() { + return status; + } + + /** Search attributes attached to this operation execution. */ + @Nonnull + public SearchAttributes getSearchAttributes() { + return searchAttributes; + } + + /** Server-tracked count of state transitions; updated on terminal status. */ + public long getStateTransitionCount() { + return stateTransitionCount; + } + + /** Close time minus scheduled time. {@code null} while still running. */ + @Nullable + public Duration getExecutionDuration() { + return executionDuration; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NexusOperationExecutionMetadata that = (NexusOperationExecutionMetadata) o; + return stateTransitionCount == that.stateTransitionCount + && Objects.equals(operationId, that.operationId) + && Objects.equals(runId, that.runId) + && Objects.equals(endpoint, that.endpoint) + && Objects.equals(service, that.service) + && Objects.equals(operation, that.operation) + && Objects.equals(scheduledTime, that.scheduledTime) + && Objects.equals(closeTime, that.closeTime) + && status == that.status + && Objects.equals(searchAttributes, that.searchAttributes) + && Objects.equals(executionDuration, that.executionDuration); + } + + @Override + public int hashCode() { + return Objects.hash( + operationId, + runId, + endpoint, + service, + operation, + scheduledTime, + closeTime, + status, + searchAttributes, + stateTransitionCount, + executionDuration); + } + + @Override + public String toString() { + return "NexusOperationExecutionMetadata{" + + "operationId='" + + operationId + + "', runId='" + + runId + + "', endpoint='" + + endpoint + + "', service='" + + service + + "', operation='" + + operation + + "', status=" + + status + + ", scheduledTime=" + + scheduledTime + + ", closeTime=" + + closeTime + + ", executionDuration=" + + executionDuration + + ", searchAttributes=" + + searchAttributes + + '}'; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusOperationHandle.java b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationHandle.java new file mode 100644 index 000000000..2445c492f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationHandle.java @@ -0,0 +1,82 @@ +package io.temporal.client; + +import io.temporal.common.Experimental; +import java.lang.reflect.Type; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; + +/** + * A typed handle to a standalone Nexus operation execution. Extends {@link + * UntypedNexusOperationHandle} with typed result methods bound to a known result type. + * + *

Obtain an instance via {@link NexusServiceClient} or by wrapping an {@link + * UntypedNexusOperationHandle} (returned by {@link NexusClient#getHandle(String)}) with {@link + * #fromUntyped(UntypedNexusOperationHandle, Class)}. + * + * @param the result type of the Nexus operation + * @see UntypedNexusOperationHandle + * @see NexusServiceClient + * @see NexusClient + */ +@Experimental +public interface NexusOperationHandle extends UntypedNexusOperationHandle { + + /** + * Wraps an {@link UntypedNexusOperationHandle} with a known result type. + * + * @param handle the untyped handle to wrap + * @param resultClass the class to deserialize the result into + * @return a typed handle + */ + static NexusOperationHandle fromUntyped( + UntypedNexusOperationHandle handle, Class resultClass) { + return fromUntyped(handle, resultClass, null); + } + + /** + * Wraps an {@link UntypedNexusOperationHandle} with a known result type for generic types. Pass a + * non-null {@code resultType} when the result is a generic type whose parameters cannot be + * captured by {@link Class} alone (e.g. {@code List}). + * + * @param handle the untyped handle to wrap + * @param resultClass the class to deserialize the result into + * @param resultType the generic type; may be {@code null} + * @return a typed handle + */ + static NexusOperationHandle fromUntyped( + UntypedNexusOperationHandle handle, Class resultClass, @Nullable Type resultType) { + return new NexusOperationHandleImpl<>(handle, resultClass, resultType); + } + + /** + * Blocks until the Nexus operation completes and returns the typed result. + * + * @throws RuntimeException if the operation failed, timed out, or was cancelled + */ + R getResult(); + + /** + * Blocks until the Nexus operation completes and returns the typed result, or throws if the + * client-side timeout expires first. + * + * @param timeout maximum time to wait + * @param unit unit of {@code timeout} + * @throws RuntimeException if the operation failed, timed out on the server, or was cancelled + * @throws TimeoutException if {@code timeout} expires before the operation completes + */ + R getResult(long timeout, TimeUnit unit) throws TimeoutException; + + /** Returns a future that completes when the Nexus operation completes with the typed result. */ + CompletableFuture getResultAsync(); + + /** + * Returns a future that completes with the typed result, or completes exceptionally with a {@link + * TimeoutException} if {@code timeout} elapses before the operation completes. + * + * @param timeout maximum time to wait + * @param unit unit of {@code timeout} + */ + CompletableFuture getResultAsync(long timeout, TimeUnit unit); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusOperationHandleImpl.java b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationHandleImpl.java new file mode 100644 index 000000000..4c886fd18 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusOperationHandleImpl.java @@ -0,0 +1,124 @@ +package io.temporal.client; + +import java.lang.reflect.Type; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; + +/** + * Package-private wrapper that adds typed result methods to an {@link UntypedNexusOperationHandle}, + * implementing {@link NexusOperationHandle}{@code }. Created via {@link + * NexusOperationHandle#fromUntyped(UntypedNexusOperationHandle, Class)} or {@link + * NexusOperationHandle#fromUntyped(UntypedNexusOperationHandle, Class, Type)}. + */ +final class NexusOperationHandleImpl implements NexusOperationHandle { + + private final UntypedNexusOperationHandle delegate; + private final Class resultClass; + private final @Nullable Type resultType; + + NexusOperationHandleImpl( + UntypedNexusOperationHandle delegate, Class resultClass, @Nullable Type resultType) { + this.delegate = delegate; + this.resultClass = resultClass; + this.resultType = resultType; + } + + @Override + public R getResult() { + return delegate.getResult(resultClass, resultType); + } + + @Override + public R getResult(long timeout, TimeUnit unit) throws TimeoutException { + return delegate.getResult(timeout, unit, resultClass, resultType); + } + + @Override + public CompletableFuture getResultAsync() { + return delegate.getResultAsync(resultClass, resultType); + } + + @Override + public CompletableFuture getResultAsync(long timeout, TimeUnit unit) { + return delegate.getResultAsync(timeout, unit, resultClass, resultType); + } + + @Override + public String getNexusOperationId() { + return delegate.getNexusOperationId(); + } + + @Override + public @Nullable String getNexusOperationRunId() { + return delegate.getNexusOperationRunId(); + } + + @Override + public T getResult(Class clazz) { + return delegate.getResult(clazz); + } + + @Override + public T getResult(Class clazz, @Nullable Type type) { + return delegate.getResult(clazz, type); + } + + @Override + public T getResult(long timeout, TimeUnit unit, Class clazz) throws TimeoutException { + return delegate.getResult(timeout, unit, clazz, null); + } + + @Override + public T getResult(long timeout, TimeUnit unit, Class clazz, @Nullable Type type) + throws TimeoutException { + return delegate.getResult(timeout, unit, clazz, type); + } + + @Override + public CompletableFuture getResultAsync(Class clazz) { + return delegate.getResultAsync(clazz); + } + + @Override + public CompletableFuture getResultAsync(Class clazz, @Nullable Type type) { + return delegate.getResultAsync(clazz, type); + } + + @Override + public CompletableFuture getResultAsync(long timeout, TimeUnit unit, Class clazz) { + return delegate.getResultAsync(timeout, unit, clazz, null); + } + + @Override + public CompletableFuture getResultAsync( + long timeout, TimeUnit unit, Class clazz, @Nullable Type type) { + return delegate.getResultAsync(timeout, unit, clazz, type); + } + + @Override + public NexusOperationExecutionDescription describe() { + return delegate.describe(); + } + + @Override + public void cancel() { + delegate.cancel(); + } + + @Override + public void cancel(@Nullable String reason) { + delegate.cancel(reason); + } + + @Override + public void terminate() { + delegate.terminate(); + } + + @Override + public void terminate(@Nullable String reason) { + delegate.terminate(reason); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClient.java b/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClient.java new file mode 100644 index 000000000..acebd48af --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClient.java @@ -0,0 +1,123 @@ +package io.temporal.client; + +import io.temporal.common.Experimental; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; + +/** + * Typed client for invoking standalone Nexus operations on a specific service interface {@code T}. + * + *

Operations are dispatched via method references (or {@link BiFunction} lambdas) that target + * methods on {@code T}; the client extracts the operation name from the invocation and delegates to + * {@link NexusClient}. For visibility queries (list/count) across operations, use {@link + * NexusClient} directly. + * + * @param the Nexus service interface this client is bound to + * @see NexusClient + * @see UntypedNexusServiceClient + */ +@Experimental +public interface NexusServiceClient extends UntypedNexusServiceClient { + + /** + * Creates a client bound to {@code service} that dispatches calls via {@code endpoint} using + * default client options. + * + * @param service the Nexus service interface class + * @param endpoint the Nexus endpoint name as configured on the server + * @param stubs gRPC stubs for talking to the Temporal service + * @return a new typed client + */ + static NexusServiceClient newInstance( + Class service, String endpoint, WorkflowServiceStubs stubs) { + return newInstance(service, endpoint, stubs, NexusClientOptions.getDefaultInstance()); + } + + /** + * Creates a client bound to {@code service} that dispatches calls via {@code endpoint} using the + * supplied client options. + * + * @param service the Nexus service interface class + * @param endpoint the Nexus endpoint name as configured on the server + * @param stubs gRPC stubs for talking to the Temporal service + * @param options client-wide options (namespace, identity, interceptors, etc.) + * @return a new typed client + */ + static NexusServiceClient newInstance( + Class service, String endpoint, WorkflowServiceStubs stubs, NexusClientOptions options) { + return NexusServiceClientImpl.newInstance(service, endpoint, stubs, options); + } + + /** + * Executes an operation synchronously. Equivalent to {@link #start(BiFunction, Object)} followed + * by {@link NexusOperationHandle#getResult()}. + * + * @param operation a method reference on {@code T} identifying the operation + * @param input the operation input + * @return the operation result + * @throws RuntimeException if the operation failed, timed out, or was cancelled + */ + default R execute(BiFunction operation, U input) { + return start(operation, input).getResult(); + } + + /** + * Executes an operation synchronously with per-call options. + * + * @param operation a method reference on {@code T} identifying the operation + * @param input the operation input + * @param options per-call options controlling timeouts, search attributes, etc. + * @return the operation result + * @throws RuntimeException if the operation failed, timed out, or was cancelled + */ + default R execute( + BiFunction operation, U input, StartNexusOperationOptions options) { + return start(operation, input, options).getResult(); + } + + /** + * Starts an operation and returns a typed handle for tracking its execution. + * + * @param operation a method reference on {@code T} identifying the operation + * @param input the operation input + * @return a typed handle bound to the started operation + */ + default NexusOperationHandle start(BiFunction operation, U input) { + return start(operation, input, StartNexusOperationOptions.getDefaultInstance()); + } + + /** + * Starts an operation with per-call options and returns a typed handle. + * + * @param operation a method reference on {@code T} identifying the operation + * @param input the operation input + * @param options per-call options controlling timeouts, search attributes, etc. + * @return a typed handle bound to the started operation + */ + NexusOperationHandle start( + BiFunction operation, U input, StartNexusOperationOptions options); + + /** + * Async variant of {@link #execute(BiFunction, Object)}. Returns a {@link CompletableFuture} that + * completes with the typed result, or completes exceptionally if the operation fails. + * + * @param operation a method reference on {@code T} identifying the operation + * @param input the operation input + */ + default CompletableFuture executeAsync(BiFunction operation, U input) { + return start(operation, input).getResultAsync(); + } + + /** + * Async variant of {@link #execute(BiFunction, Object, StartNexusOperationOptions)}. + * + * @param operation a method reference on {@code T} identifying the operation + * @param input the operation input + * @param options per-call options controlling timeouts, search attributes, etc. + */ + default CompletableFuture executeAsync( + BiFunction operation, U input, StartNexusOperationOptions options) { + return start(operation, input, options).getResultAsync(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClientImpl.java b/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClientImpl.java new file mode 100644 index 000000000..84fbab39e --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/NexusServiceClientImpl.java @@ -0,0 +1,81 @@ +package io.temporal.client; + +import static io.temporal.internal.WorkflowThreadMarker.enforceNonWorkflowThread; + +import io.nexusrpc.OperationDefinition; +import io.nexusrpc.ServiceDefinition; +import io.temporal.common.Experimental; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.internal.WorkflowThreadMarker; +import io.temporal.internal.util.MethodExtractor; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.workflow.Functions; +import java.lang.reflect.Method; +import java.util.function.BiFunction; + +/** + * Typed Nexus service client. Extracts the operation name from a {@link BiFunction} that targets a + * method on the service interface (via a {@link Proxy} of {@code T}) and delegates the start RPC to + * the interceptor chain inherited from the underlying {@link NexusClient}. + */ +@Experimental +class NexusServiceClientImpl extends UntypedNexusServiceClientImpl + implements NexusServiceClient { + + private final Class serviceInterface; + private final ServiceDefinition serviceDef; + + static NexusServiceClient newInstance( + Class service, String endpoint, WorkflowServiceStubs stubs, NexusClientOptions options) { + enforceNonWorkflowThread(); + // Build the underlying NexusClient impl directly (bypassing the wrapped factory) so we can + // hand its interceptor chain to the service client. The outer service-client proxy below + // still enforces the non-workflow-thread check at every call. + NexusClientImpl rawClient = new NexusClientImpl(stubs, options); + return WorkflowThreadMarker.protectFromWorkflowThread( + new NexusServiceClientImpl<>( + rawClient.getNexusClientCallsInvoker(), service, endpoint, options), + NexusServiceClient.class); + } + + NexusServiceClientImpl( + NexusClientCallsInterceptor invoker, + Class serviceInterface, + String endpoint, + NexusClientOptions options) { + this(invoker, serviceInterface, ServiceDefinition.fromClass(serviceInterface), endpoint, options); + } + + private NexusServiceClientImpl( + NexusClientCallsInterceptor invoker, + Class serviceInterface, + ServiceDefinition serviceDef, + String endpoint, + NexusClientOptions options) { + super(invoker, endpoint, serviceDef.getName(), options); + this.serviceInterface = serviceInterface; + this.serviceDef = serviceDef; + } + + @Override + public NexusOperationHandle start( + BiFunction operation, U input, StartNexusOperationOptions options) { + Method method = + MethodExtractor.extract(serviceInterface, (Functions.Func2) operation::apply); + OperationDefinition opDef = + serviceDef.getOperations().values().stream() + .filter(o -> method.getName().equals(o.getMethodName())) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException( + "Method " + + method.getName() + + " is not a Nexus operation on " + + serviceInterface.getName())); + @SuppressWarnings("unchecked") + Class resultClass = (Class) method.getReturnType(); + UntypedNexusOperationHandle untyped = start(opDef.getName(), options, input); + return NexusOperationHandle.fromUntyped(untyped, resultClass, method.getGenericReturnType()); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/StartNexusOperationOptions.java b/temporal-sdk/src/main/java/io/temporal/client/StartNexusOperationOptions.java new file mode 100644 index 000000000..b0e55774b --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/StartNexusOperationOptions.java @@ -0,0 +1,228 @@ +package io.temporal.client; + +import io.temporal.api.enums.v1.NexusOperationIdConflictPolicy; +import io.temporal.api.enums.v1.NexusOperationIdReusePolicy; +import io.temporal.common.Experimental; +import io.temporal.common.SearchAttributes; +import java.time.Duration; +import java.util.Objects; +import javax.annotation.Nullable; + +/** + * Per-call options for starting a standalone Nexus operation via {@link + * UntypedNexusServiceClient#start} (or its typed counterpart). + */ +@Experimental +public final class StartNexusOperationOptions { + + public static Builder newBuilder() { + return new Builder(); + } + + public static Builder newBuilder(StartNexusOperationOptions options) { + return new Builder(options); + } + + private static final StartNexusOperationOptions DEFAULT_INSTANCE = newBuilder().build(); + + /** Returns an options instance with no per-call fields set. */ + public static StartNexusOperationOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + public static final class Builder { + private @Nullable String id; + private @Nullable Duration scheduleToCloseTimeout; + private @Nullable Duration scheduleToStartTimeout; + private @Nullable Duration startToCloseTimeout; + private @Nullable SearchAttributes typedSearchAttributes; + private @Nullable String summary; + private @Nullable NexusOperationIdReusePolicy idReusePolicy; + private @Nullable NexusOperationIdConflictPolicy idConflictPolicy; + + private Builder() {} + + private Builder(StartNexusOperationOptions options) { + if (options == null) { + return; + } + this.id = options.id; + this.scheduleToCloseTimeout = options.scheduleToCloseTimeout; + this.scheduleToStartTimeout = options.scheduleToStartTimeout; + this.startToCloseTimeout = options.startToCloseTimeout; + this.typedSearchAttributes = options.typedSearchAttributes; + this.summary = options.summary; + this.idReusePolicy = options.idReusePolicy; + this.idConflictPolicy = options.idConflictPolicy; + } + + /** + * Required. Unique identifier for this operation within its namespace. If left null, the SDK + * generates a random UUID. + */ + public Builder setId(@Nullable String id) { + this.id = id; + return this; + } + + /** Total time the caller is willing to wait for the operation to complete. */ + public Builder setScheduleToCloseTimeout(@Nullable Duration scheduleToCloseTimeout) { + this.scheduleToCloseTimeout = scheduleToCloseTimeout; + return this; + } + + /** Time the operation may wait in the queue before a handler picks it up. */ + public Builder setScheduleToStartTimeout(@Nullable Duration scheduleToStartTimeout) { + this.scheduleToStartTimeout = scheduleToStartTimeout; + return this; + } + + /** Maximum time for a single attempt. */ + public Builder setStartToCloseTimeout(@Nullable Duration startToCloseTimeout) { + this.startToCloseTimeout = startToCloseTimeout; + return this; + } + + /** Typed search attributes to attach to this operation execution. */ + public Builder setTypedSearchAttributes(@Nullable SearchAttributes typedSearchAttributes) { + this.typedSearchAttributes = typedSearchAttributes; + return this; + } + + /** Short summary for UI display. */ + public Builder setSummary(@Nullable String summary) { + this.summary = summary; + return this; + } + + /** Controls behavior when an operation with the same ID was previously run and is closed. */ + public Builder setIdReusePolicy(@Nullable NexusOperationIdReusePolicy idReusePolicy) { + this.idReusePolicy = idReusePolicy; + return this; + } + + /** Controls behavior when an operation with the same ID is currently running. */ + public Builder setIdConflictPolicy(@Nullable NexusOperationIdConflictPolicy idConflictPolicy) { + this.idConflictPolicy = idConflictPolicy; + return this; + } + + public StartNexusOperationOptions build() { + return new StartNexusOperationOptions(this); + } + } + + private final @Nullable String id; + private final @Nullable Duration scheduleToCloseTimeout; + private final @Nullable Duration scheduleToStartTimeout; + private final @Nullable Duration startToCloseTimeout; + private final @Nullable SearchAttributes typedSearchAttributes; + private final @Nullable String summary; + private final @Nullable NexusOperationIdReusePolicy idReusePolicy; + private final @Nullable NexusOperationIdConflictPolicy idConflictPolicy; + + private StartNexusOperationOptions(Builder builder) { + this.id = builder.id; + this.scheduleToCloseTimeout = builder.scheduleToCloseTimeout; + this.scheduleToStartTimeout = builder.scheduleToStartTimeout; + this.startToCloseTimeout = builder.startToCloseTimeout; + this.typedSearchAttributes = builder.typedSearchAttributes; + this.summary = builder.summary; + this.idReusePolicy = builder.idReusePolicy; + this.idConflictPolicy = builder.idConflictPolicy; + } + + public Builder toBuilder() { + return new Builder(this); + } + + @Nullable + public String getId() { + return id; + } + + @Nullable + public Duration getScheduleToCloseTimeout() { + return scheduleToCloseTimeout; + } + + @Nullable + public Duration getScheduleToStartTimeout() { + return scheduleToStartTimeout; + } + + @Nullable + public Duration getStartToCloseTimeout() { + return startToCloseTimeout; + } + + @Nullable + public SearchAttributes getTypedSearchAttributes() { + return typedSearchAttributes; + } + + @Nullable + public String getSummary() { + return summary; + } + + @Nullable + public NexusOperationIdReusePolicy getIdReusePolicy() { + return idReusePolicy; + } + + @Nullable + public NexusOperationIdConflictPolicy getIdConflictPolicy() { + return idConflictPolicy; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StartNexusOperationOptions that = (StartNexusOperationOptions) o; + return Objects.equals(id, that.id) + && Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout) + && Objects.equals(scheduleToStartTimeout, that.scheduleToStartTimeout) + && Objects.equals(startToCloseTimeout, that.startToCloseTimeout) + && Objects.equals(typedSearchAttributes, that.typedSearchAttributes) + && Objects.equals(summary, that.summary) + && idReusePolicy == that.idReusePolicy + && idConflictPolicy == that.idConflictPolicy; + } + + @Override + public int hashCode() { + return Objects.hash( + id, + scheduleToCloseTimeout, + scheduleToStartTimeout, + startToCloseTimeout, + typedSearchAttributes, + summary, + idReusePolicy, + idConflictPolicy); + } + + @Override + public String toString() { + return "StartNexusOperationOptions{" + + "id='" + + id + + "', scheduleToCloseTimeout=" + + scheduleToCloseTimeout + + ", scheduleToStartTimeout=" + + scheduleToStartTimeout + + ", startToCloseTimeout=" + + startToCloseTimeout + + ", typedSearchAttributes=" + + typedSearchAttributes + + ", summary='" + + summary + + "', idReusePolicy=" + + idReusePolicy + + ", idConflictPolicy=" + + idConflictPolicy + + '}'; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusOperationHandle.java b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusOperationHandle.java new file mode 100644 index 000000000..d9f337ccf --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusOperationHandle.java @@ -0,0 +1,148 @@ +package io.temporal.client; + +import io.temporal.common.Experimental; +import java.lang.reflect.Type; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; + +/** + * An untyped handle to a standalone Nexus operation execution. Use this to get the result, + * describe, cancel, or terminate the operation when the result type is not known at compile time. + * + *

Obtain an instance via {@link NexusClient#getHandle(String)} or as the untyped projection of a + * handle returned by {@link NexusServiceClient}. + * + * @see NexusOperationHandle + * @see NexusClient + */ +@Experimental +public interface UntypedNexusOperationHandle { + + /** The caller-assigned operation ID for this execution. Always non-null. */ + String getNexusOperationId(); + + /** + * The server-assigned run ID for this operation execution. Present when the handle was returned + * by {@code start} or when {@link NexusClient#getHandle(String, String)} was called with an + * explicit run ID. May be {@code null} when obtained via {@link NexusClient#getHandle(String)} + * without a run ID — call {@link #describe()} to retrieve the current run ID. + */ + @Nullable + String getNexusOperationRunId(); + + /** + * Blocks until the standalone Nexus operation completes and returns the typed result. Polls the + * server via long-polling. + * + * @param resultClass the class to deserialize the result into + * @throws RuntimeException if the operation failed, timed out, or was cancelled; the specific + * exception type reflects the underlying failure + */ + R getResult(Class resultClass); + + /** + * Blocks until the standalone Nexus operation completes and returns the typed result. Use this + * overload for generic return types (e.g. {@code List}). + * + * @param resultClass the class to deserialize the result into + * @param resultType the generic type to use for deserialization; may be {@code null} + * @throws RuntimeException if the operation failed, timed out, or was cancelled; the specific + * exception type reflects the underlying failure + */ + R getResult(Class resultClass, @Nullable Type resultType); + + /** + * Blocks until the standalone Nexus operation completes and returns the typed result, or throws + * if the client-side timeout expires before the operation completes. + * + * @param timeout maximum time to wait + * @param unit unit of {@code timeout} + * @param resultClass the class to deserialize the result into + * @throws RuntimeException if the operation failed, timed out on the server, or was cancelled + * @throws TimeoutException if the client-side {@code timeout} expires before the operation + * completes + */ + R getResult(long timeout, TimeUnit unit, Class resultClass) throws TimeoutException; + + /** + * Blocks until the standalone Nexus operation completes and returns the typed result, or throws + * if the client-side timeout expires. Use this overload for generic return types (e.g. {@code + * List}). + * + * @param timeout maximum time to wait + * @param unit unit of {@code timeout} + * @param resultClass the class to deserialize the result into + * @param resultType the generic type to use for deserialization; may be {@code null} + * @throws RuntimeException if the operation failed, timed out on the server, or was cancelled + * @throws TimeoutException if the client-side {@code timeout} expires before the operation + * completes + */ + R getResult(long timeout, TimeUnit unit, Class resultClass, @Nullable Type resultType) + throws TimeoutException; + + /** + * Returns a future that completes when the operation completes and resolves to the typed result. + * + * @param resultClass the class to deserialize the result into + */ + CompletableFuture getResultAsync(Class resultClass); + + /** + * Returns a future that completes when the operation completes and resolves to the typed result. + * Use this overload for generic return types (e.g. {@code List}). + * + * @param resultClass the class to deserialize the result into + * @param resultType the generic type to use for deserialization; may be {@code null} + */ + CompletableFuture getResultAsync(Class resultClass, @Nullable Type resultType); + + /** + * Returns a future that completes when the operation completes, or fails with {@link + * TimeoutException} if the operation does not complete within the specified timeout. + * + * @param timeout maximum time to wait + * @param unit unit of {@code timeout} + * @param resultClass the class to deserialize the result into + */ + CompletableFuture getResultAsync(long timeout, TimeUnit unit, Class resultClass); + + /** + * Returns a future for generic return types with a timeout. + * + * @param timeout maximum time to wait + * @param unit unit of {@code timeout} + * @param resultClass the class to deserialize the result into + * @param resultType the generic type to use for deserialization; may be {@code null} + */ + CompletableFuture getResultAsync( + long timeout, TimeUnit unit, Class resultClass, @Nullable Type resultType); + + /** + * Describes the current state of the Nexus operation execution. + * + * @return detailed information about the operation + */ + NexusOperationExecutionDescription describe(); + + /** Requests cancellation of the Nexus operation. */ + void cancel(); + + /** + * Requests cancellation of the Nexus operation with an optional reason. + * + * @param reason human-readable reason for cancellation, may be {@code null} + */ + void cancel(@Nullable String reason); + + /** Terminates the Nexus operation immediately, regardless of its current state. */ + void terminate(); + + /** + * Terminates the Nexus operation immediately with a reason. + * + * @param reason human-readable reason for termination, may be {@code null} + */ + void terminate(@Nullable String reason); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClient.java b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClient.java new file mode 100644 index 000000000..e2343f714 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClient.java @@ -0,0 +1,64 @@ +package io.temporal.client; + +import io.temporal.common.Experimental; +import java.lang.reflect.Type; +import javax.annotation.Nullable; + +/** + * Untyped client for invoking standalone Nexus operations by operation-name string. Use this when + * the operation contract is not available as a Java service interface at compile time. For a typed + * variant, see {@link NexusServiceClient}. + * + * @see NexusServiceClient + * @see NexusClient + */ +@Experimental +public interface UntypedNexusServiceClient { + + /** + * Starts a Nexus operation by name and returns an untyped handle for tracking its execution. + * + * @param operation the operation name as registered on the service + * @param options per-call options controlling timeouts, search attributes, etc. + * @param arg the operation input; may be {@code null} + * @return an untyped handle bound to the started operation + */ + UntypedNexusOperationHandle start( + String operation, StartNexusOperationOptions options, @Nullable Object arg); + + /** + * Executes a Nexus operation synchronously by name, blocking until it completes. + * + * @param operation the operation name as registered on the service + * @param resultClass the class to deserialize the result into + * @param options per-call options controlling timeouts, search attributes, etc. + * @param arg the operation input; may be {@code null} + * @return the deserialized operation result + * @throws RuntimeException if the operation failed, timed out, or was cancelled + */ + R execute( + String operation, + Class resultClass, + StartNexusOperationOptions options, + @Nullable Object arg); + + /** + * Executes a Nexus operation synchronously by name with an explicit generic-result {@link Type}. + * Use this overload when the result is a generic type whose parameters cannot be captured by + * {@link Class} alone (e.g. {@code List}). + * + * @param operation the operation name as registered on the service + * @param resultClass the class to deserialize the result into + * @param resultType the generic type to use for deserialization + * @param options per-call options controlling timeouts, search attributes, etc. + * @param arg the operation input; may be {@code null} + * @return the deserialized operation result + * @throws RuntimeException if the operation failed, timed out, or was cancelled + */ + R execute( + String operation, + Class resultClass, + Type resultType, + StartNexusOperationOptions options, + @Nullable Object arg); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClientImpl.java b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClientImpl.java new file mode 100644 index 000000000..30de99aa6 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/UntypedNexusServiceClientImpl.java @@ -0,0 +1,91 @@ +package io.temporal.client; + +import io.temporal.api.common.v1.Payload; +import io.temporal.common.Experimental; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionOutput; +import io.temporal.internal.client.NexusOperationHandleImpl; +import java.lang.reflect.Type; +import java.util.Collections; +import javax.annotation.Nullable; + +/** + * Untyped Nexus service client. Holds the {@link NexusClientCallsInterceptor invoker}, target + * endpoint, service name, and data converter, and translates operation-name calls into start RPCs + * routed through the interceptor chain. + */ +@Experimental +class UntypedNexusServiceClientImpl implements UntypedNexusServiceClient { + + private final NexusClientCallsInterceptor invoker; + private final String endpoint; + private final String serviceName; + private final DataConverter dataConverter; + + UntypedNexusServiceClientImpl( + NexusClientCallsInterceptor invoker, + String endpoint, + String serviceName, + NexusClientOptions clientOptions) { + if (invoker == null || endpoint == null || serviceName == null || clientOptions == null) { + throw new IllegalArgumentException( + "invoker, endpoint, serviceName, and clientOptions are all required"); + } + this.invoker = invoker; + this.endpoint = endpoint; + this.serviceName = serviceName; + this.dataConverter = clientOptions.getDataConverter(); + } + + @Override + public UntypedNexusOperationHandle start( + String operation, StartNexusOperationOptions options, @Nullable Object arg) { + Payload payload = serializeInput(arg); + StartNexusOperationExecutionInput input = + new StartNexusOperationExecutionInput( + endpoint, + serviceName, + operation, + payload, + options != null ? options : StartNexusOperationOptions.getDefaultInstance(), + Collections.emptyMap()); + StartNexusOperationExecutionOutput output = invoker.startNexusOperationExecution(input); + return new NexusOperationHandleImpl( + output.getOperationId(), output.getRunId(), invoker, dataConverter); + } + + @Override + public R execute( + String operation, + Class resultClass, + StartNexusOperationOptions options, + @Nullable Object arg) { + return execute(operation, resultClass, /* resultType= */ null, options, arg); + } + + @Override + public R execute( + String operation, + Class resultClass, + @Nullable Type resultType, + StartNexusOperationOptions options, + @Nullable Object arg) { + UntypedNexusOperationHandle handle = start(operation, options, arg); + return NexusOperationHandle.fromUntyped(handle, resultClass, resultType).getResult(); + } + + private @Nullable Payload serializeInput(@Nullable Object arg) { + if (arg == null) { + return null; + } + Class argClass = arg.getClass(); + return dataConverter + .toPayload(arg) + .orElseThrow( + () -> + new IllegalStateException( + "DataConverter returned no payload for input of type " + argClass.getName())); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptor.java new file mode 100644 index 000000000..ff146c19f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptor.java @@ -0,0 +1,476 @@ +package io.temporal.common.interceptors; + +import com.google.protobuf.ByteString; +import io.grpc.Deadline; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.enums.v1.NexusOperationWaitStage; +import io.temporal.api.failure.v1.Failure; +import io.temporal.api.nexus.v1.NexusOperationExecutionListInfo; +import io.temporal.client.NexusClient; +import io.temporal.client.NexusOperationExecutionDescription; +import io.temporal.client.NexusOperationHandle; +import io.temporal.client.StartNexusOperationOptions; +import io.temporal.common.Experimental; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Per-call interceptor for {@link NexusClient} and {@link NexusOperationHandle} operations on + * standalone Nexus operation executions. + * + *

Implementations are produced by {@link + * NexusClientInterceptor#nexusClientCallsInterceptor(NexusClientCallsInterceptor)} during {@link + * NexusClient} construction. Prefer extending {@link NexusClientCallsInterceptorBase} and + * overriding only the methods you need. + */ +@Experimental +public interface NexusClientCallsInterceptor { + + /** + * Starts a standalone Nexus operation. The endpoint, service, operation name, input, and + * scheduling options are carried in {@code input}. + * + * @param input endpoint, service name, operation name, encoded input, and start options + * @return output containing the operation ID, server-assigned run ID, and whether the operation + * was started by this call (vs. de-duplicated to an existing one) + */ + StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input); + + /** + * Returns a point-in-time snapshot of a standalone Nexus operation execution. + * + * @param input operation ID, optional run ID, and flags controlling whether to include input and + * outcome payloads + * @return output wrapping the {@link NexusOperationExecutionDescription} + */ + DescribeNexusOperationExecutionOutput describeNexusOperationExecution( + DescribeNexusOperationExecutionInput input); + + /** + * Synchronously long-polls the server until the Nexus operation reaches the wait stage requested + * in {@code input}, then returns the outcome. Blocks the calling thread for the duration. + * + * @param input operation ID, optional run ID, target wait stage, and the deadline bounding the + * poll + * @return output containing the run ID, wait stage reached, operation token, and either the + * result payload or failure (when the operation has reached a terminal stage) + */ + PollNexusOperationExecutionOutput pollNexusOperationExecution( + PollNexusOperationExecutionInput input); + + /** + * Asynchronous variant of {@link #pollNexusOperationExecution} that returns a future without + * blocking the calling thread. + * + * @param input operation ID, optional run ID, target wait stage, and the deadline bounding the + * poll + * @return a future that completes with the poll output, or completes exceptionally if the poll + * fails or the deadline expires + */ + CompletableFuture pollNexusOperationExecutionAsync( + PollNexusOperationExecutionInput input); + + /** + * Lists standalone Nexus operation executions matching a Visibility query, with paging support. + * + * @param input Visibility query string, page size, and optional next-page token from a prior call + * @return output wrapping the matching operations and the next-page token (empty when the result + * set is exhausted) + */ + ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input); + + /** + * Returns the count of standalone Nexus operation executions matching a Visibility query, + * optionally grouped by attribute. + * + * @param input Visibility query string + * @return output wrapping the total count and any aggregation groups + */ + CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input); + + /** + * Requests cancellation of a running standalone Nexus operation. The server forwards the cancel + * request to the operation handler, which may honour or ignore it. + * + * @param input operation ID, optional run ID, and optional human-readable cancellation reason + */ + void requestCancelNexusOperationExecution(RequestCancelNexusOperationExecutionInput input); + + /** + * Forcefully terminates a standalone Nexus operation. Unlike cancellation, termination is + * immediate and cannot be intercepted by the operation handler. + * + * @param input operation ID, optional run ID, and optional human-readable termination reason + */ + void terminateNexusOperationExecution(TerminateNexusOperationExecutionInput input); + + /** + * Deletes a closed standalone Nexus operation execution from the server's visibility store. The + * operation must already be in a terminal state. + * + * @param input operation ID and optional run ID + */ + void deleteNexusOperationExecution(DeleteNexusOperationExecutionInput input); + + final class StartNexusOperationExecutionInput { + private final String endpoint; + private final String service; + private final String operation; + private final @Nullable Payload input; + private final StartNexusOperationOptions options; + private final Map headers; + + public StartNexusOperationExecutionInput( + String endpoint, + String service, + String operation, + @Nullable Payload input, + StartNexusOperationOptions options, + Map headers) { + this.endpoint = endpoint; + this.service = service; + this.operation = operation; + this.input = input; + this.options = options; + this.headers = headers == null ? Collections.emptyMap() : headers; + } + + public String getEndpoint() { + return endpoint; + } + + public String getService() { + return service; + } + + public String getOperation() { + return operation; + } + + public Optional getInput() { + return Optional.ofNullable(input); + } + + public StartNexusOperationOptions getOptions() { + return options; + } + + /** + * Nexus protocol headers to forward to the handler. Interceptors implementing context + * propagation (tracing, baggage, etc.) populate this map by wrapping the call chain. + */ + public Map getHeaders() { + return headers; + } + } + + final class StartNexusOperationExecutionOutput { + private final String operationId; + private final String runId; + private final boolean started; + + public StartNexusOperationExecutionOutput(String operationId, String runId, boolean started) { + this.operationId = operationId; + this.runId = runId; + this.started = started; + } + + public String getOperationId() { + return operationId; + } + + public String getRunId() { + return runId; + } + + public boolean isStarted() { + return started; + } + } + + final class DescribeNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + private final boolean includeInput; + private final boolean includeOutcome; + + public DescribeNexusOperationExecutionInput( + String operationId, @Nullable String runId, boolean includeInput, boolean includeOutcome) { + this.operationId = operationId; + this.runId = runId; + this.includeInput = includeInput; + this.includeOutcome = includeOutcome; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + + public boolean isIncludeInput() { + return includeInput; + } + + public boolean isIncludeOutcome() { + return includeOutcome; + } + } + + final class DescribeNexusOperationExecutionOutput { + private final NexusOperationExecutionDescription description; + + public DescribeNexusOperationExecutionOutput(NexusOperationExecutionDescription description) { + this.description = description; + } + + public NexusOperationExecutionDescription getDescription() { + return description; + } + } + + final class PollNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + private final NexusOperationWaitStage waitStage; + private final @Nonnull Deadline deadline; + + public PollNexusOperationExecutionInput( + String operationId, + @Nullable String runId, + NexusOperationWaitStage waitStage, + @Nonnull Deadline deadline) { + this.operationId = operationId; + this.runId = runId; + this.waitStage = waitStage; + this.deadline = deadline; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + + public NexusOperationWaitStage getWaitStage() { + return waitStage; + } + + public Deadline getDeadline() { + return deadline; + } + } + + final class PollNexusOperationExecutionOutput { + private final String runId; + private final NexusOperationWaitStage waitStage; + private final String operationToken; + private final @Nullable Payload result; + private final @Nullable Failure failure; + + public PollNexusOperationExecutionOutput( + String runId, + NexusOperationWaitStage waitStage, + String operationToken, + @Nullable Payload result, + @Nullable Failure failure) { + this.runId = runId; + this.waitStage = waitStage; + this.operationToken = operationToken; + this.result = result; + this.failure = failure; + } + + public String getRunId() { + return runId; + } + + public NexusOperationWaitStage getWaitStage() { + return waitStage; + } + + public String getOperationToken() { + return operationToken; + } + + public Optional getResult() { + return Optional.ofNullable(result); + } + + public Optional getFailure() { + return Optional.ofNullable(failure); + } + } + + final class ListNexusOperationExecutionsInput { + private final @Nullable String query; + private final int pageSize; + private final @Nullable ByteString nextPageToken; + + public ListNexusOperationExecutionsInput( + @Nullable String query, int pageSize, @Nullable ByteString nextPageToken) { + this.query = query; + this.pageSize = pageSize; + this.nextPageToken = nextPageToken; + } + + public Optional getQuery() { + return Optional.ofNullable(query); + } + + public int getPageSize() { + return pageSize; + } + + public Optional getNextPageToken() { + return Optional.ofNullable(nextPageToken); + } + } + + final class ListNexusOperationExecutionsOutput { + private final List operations; + private final ByteString nextPageToken; + + public ListNexusOperationExecutionsOutput( + List operations, ByteString nextPageToken) { + this.operations = Collections.unmodifiableList(operations); + this.nextPageToken = nextPageToken; + } + + public List getOperations() { + return operations; + } + + public ByteString getNextPageToken() { + return nextPageToken; + } + } + + final class CountNexusOperationExecutionsInput { + private final @Nullable String query; + + public CountNexusOperationExecutionsInput(@Nullable String query) { + this.query = query; + } + + public Optional getQuery() { + return Optional.ofNullable(query); + } + } + + final class CountNexusOperationExecutionsOutput { + private final long count; + private final List groups; + + public CountNexusOperationExecutionsOutput(long count, List groups) { + this.count = count; + this.groups = Collections.unmodifiableList(groups); + } + + public long getCount() { + return count; + } + + public List getGroups() { + return groups; + } + + public static final class AggregationGroup { + private final List groupValues; + private final long count; + + public AggregationGroup(List groupValues, long count) { + this.groupValues = Collections.unmodifiableList(groupValues); + this.count = count; + } + + public List getGroupValues() { + return groupValues; + } + + public long getCount() { + return count; + } + } + } + + final class RequestCancelNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + private final @Nullable String reason; + + public RequestCancelNexusOperationExecutionInput( + String operationId, @Nullable String runId, @Nullable String reason) { + this.operationId = operationId; + this.runId = runId; + this.reason = reason; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + + public Optional getReason() { + return Optional.ofNullable(reason); + } + } + + final class TerminateNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + private final @Nullable String reason; + + public TerminateNexusOperationExecutionInput( + String operationId, @Nullable String runId, @Nullable String reason) { + this.operationId = operationId; + this.runId = runId; + this.reason = reason; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + + public Optional getReason() { + return Optional.ofNullable(reason); + } + } + + final class DeleteNexusOperationExecutionInput { + private final String operationId; + private final @Nullable String runId; + + public DeleteNexusOperationExecutionInput(String operationId, @Nullable String runId) { + this.operationId = operationId; + this.runId = runId; + } + + public String getOperationId() { + return operationId; + } + + public Optional getRunId() { + return Optional.ofNullable(runId); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptorBase.java new file mode 100644 index 000000000..4bd34c6a3 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientCallsInterceptorBase.java @@ -0,0 +1,70 @@ +package io.temporal.common.interceptors; + +import io.temporal.common.Experimental; +import java.util.concurrent.CompletableFuture; + +/** + * Convenience base class for {@link NexusClientCallsInterceptor} implementations that need to + * override only a subset of methods. All methods delegate to the wrapped {@code next} interceptor. + */ +@Experimental +public class NexusClientCallsInterceptorBase implements NexusClientCallsInterceptor { + + private final NexusClientCallsInterceptor next; + + public NexusClientCallsInterceptorBase(NexusClientCallsInterceptor next) { + this.next = next; + } + + @Override + public StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input) { + return next.startNexusOperationExecution(input); + } + + @Override + public DescribeNexusOperationExecutionOutput describeNexusOperationExecution( + DescribeNexusOperationExecutionInput input) { + return next.describeNexusOperationExecution(input); + } + + @Override + public PollNexusOperationExecutionOutput pollNexusOperationExecution( + PollNexusOperationExecutionInput input) { + return next.pollNexusOperationExecution(input); + } + + @Override + public CompletableFuture pollNexusOperationExecutionAsync( + PollNexusOperationExecutionInput input) { + return next.pollNexusOperationExecutionAsync(input); + } + + @Override + public ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input) { + return next.listNexusOperationExecutions(input); + } + + @Override + public CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input) { + return next.countNexusOperationExecutions(input); + } + + @Override + public void requestCancelNexusOperationExecution( + RequestCancelNexusOperationExecutionInput input) { + next.requestCancelNexusOperationExecution(input); + } + + @Override + public void terminateNexusOperationExecution(TerminateNexusOperationExecutionInput input) { + next.terminateNexusOperationExecution(input); + } + + @Override + public void deleteNexusOperationExecution(DeleteNexusOperationExecutionInput input) { + next.deleteNexusOperationExecution(input); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptor.java new file mode 100644 index 000000000..3af217f3f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptor.java @@ -0,0 +1,24 @@ +package io.temporal.common.interceptors; + +import io.temporal.client.NexusClient; +import io.temporal.client.NexusClientOptions; +import io.temporal.common.Experimental; + +/** + * Outer interceptor for {@link NexusClient}. Implementations are registered via {@link + * NexusClientOptions.Builder#setInterceptors(java.util.List)} and consulted once during client + * construction to build the chain of {@link NexusClientCallsInterceptor}s that wraps the root + * invoker. + */ +@Experimental +public interface NexusClientInterceptor { + + /** + * Called once during {@link NexusClient} construction to build the chain of per-call + * interceptors. + * + * @param next next per-call interceptor in the chain + * @return new per-call interceptor that decorates calls to {@code next} + */ + NexusClientCallsInterceptor nexusClientCallsInterceptor(NexusClientCallsInterceptor next); +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptorBase.java new file mode 100644 index 000000000..b964626fd --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusClientInterceptorBase.java @@ -0,0 +1,13 @@ +package io.temporal.common.interceptors; + +import io.temporal.common.Experimental; + +/** Convenience base class for {@link NexusClientInterceptor} implementations. */ +@Experimental +public class NexusClientInterceptorBase implements NexusClientInterceptor { + + @Override + public NexusClientCallsInterceptor nexusClientCallsInterceptor(NexusClientCallsInterceptor next) { + return next; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/NexusOperationHandleImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/NexusOperationHandleImpl.java new file mode 100644 index 000000000..875d9e0ca --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/NexusOperationHandleImpl.java @@ -0,0 +1,232 @@ +package io.temporal.internal.client; + +import io.grpc.Deadline; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.enums.v1.NexusOperationWaitStage; +import io.temporal.api.failure.v1.Failure; +import io.temporal.client.NexusOperationExecutionDescription; +import io.temporal.client.UntypedNexusOperationHandle; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.DescribeNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.DescribeNexusOperationExecutionOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.PollNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.PollNexusOperationExecutionOutput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.RequestCancelNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.TerminateNexusOperationExecutionInput; +import java.lang.reflect.Type; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; + +/** + * Implementation of {@link UntypedNexusOperationHandle} that delegates lifecycle operations through + * the interceptor chain. + */ +public final class NexusOperationHandleImpl implements UntypedNexusOperationHandle { + + private final String operationId; + private final @Nullable String runId; + private final NexusClientCallsInterceptor interceptor; + private final DataConverter dataConverter; + + public NexusOperationHandleImpl( + String operationId, + @Nullable String runId, + NexusClientCallsInterceptor interceptor, + DataConverter dataConverter) { + if (operationId == null) { + throw new IllegalArgumentException("operationId is required"); + } + if (interceptor == null) { + throw new IllegalArgumentException("interceptor is required"); + } + if (dataConverter == null) { + throw new IllegalArgumentException("dataConverter is required"); + } + this.operationId = operationId; + this.runId = runId; + this.interceptor = interceptor; + this.dataConverter = dataConverter; + } + + @Override + public String getNexusOperationId() { + return operationId; + } + + @Override + public @Nullable String getNexusOperationRunId() { + return runId; + } + + @Override + public NexusOperationExecutionDescription describe() { + DescribeNexusOperationExecutionInput input = + new DescribeNexusOperationExecutionInput( + operationId, runId, /* includeInput= */ false, /* includeOutcome= */ true); + DescribeNexusOperationExecutionOutput output = + interceptor.describeNexusOperationExecution(input); + return output.getDescription(); + } + + @Override + public void cancel() { + cancel(null); + } + + @Override + public void cancel(@Nullable String reason) { + interceptor.requestCancelNexusOperationExecution( + new RequestCancelNexusOperationExecutionInput(operationId, runId, reason)); + } + + @Override + public void terminate() { + terminate(null); + } + + @Override + public void terminate(@Nullable String reason) { + interceptor.terminateNexusOperationExecution( + new TerminateNexusOperationExecutionInput(operationId, runId, reason)); + } + + @Override + public R getResult(Class resultClass) { + return getResult(resultClass, null); + } + + @Override + public R getResult(Class resultClass, @Nullable Type resultType) { + try { + return getResult(Integer.MAX_VALUE, TimeUnit.MILLISECONDS, resultClass, resultType); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + @Override + public CompletableFuture getResultAsync(Class resultClass) { + return getResultAsync(resultClass, null); + } + + @Override + public CompletableFuture getResultAsync(Class resultClass, @Nullable Type resultType) { + return getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS, resultClass, resultType); + } + + @Override + public R getResult(long timeout, TimeUnit unit, Class resultClass) + throws TimeoutException { + return getResult(timeout, unit, resultClass, null); + } + + @Override + public R getResult( + long timeout, TimeUnit unit, Class resultClass, @Nullable Type resultType) + throws TimeoutException { + PollNexusOperationExecutionOutput out = + pollSyncUntilCompletedOrDeadline(Deadline.after(timeout, unit)); + return extractResult(out, resultClass, resultType); + } + + @Override + public CompletableFuture getResultAsync( + long timeout, TimeUnit unit, Class resultClass) { + return getResultAsync(timeout, unit, resultClass, null); + } + + @Override + public CompletableFuture getResultAsync( + long timeout, TimeUnit unit, Class resultClass, @Nullable Type resultType) { + Deadline deadline = Deadline.after(timeout, unit); + return pollAsyncUntilCompletedOrDeadline(deadline) + .handle( + (out, e) -> { + if (e == null) { + return extractResult(out, resultClass, resultType); + } + throw mapAsyncException(e, deadline); + }); + } + + private PollNexusOperationExecutionOutput pollSyncUntilCompletedOrDeadline(Deadline deadline) + throws TimeoutException { + while (true) { + PollNexusOperationExecutionInput pollInput = + new PollNexusOperationExecutionInput( + operationId, + runId, + NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED, + deadline); + PollNexusOperationExecutionOutput out; + try { + out = interceptor.pollNexusOperationExecution(pollInput); + } catch (StatusRuntimeException e) { + if (deadline.isExpired() && Status.Code.DEADLINE_EXCEEDED.equals(e.getStatus().getCode())) { + throw new TimeoutException("getResult timed out before the operation completed"); + } + throw e; + } + if (out.getWaitStage() == NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED) { + return out; + } + } + } + + private CompletableFuture pollAsyncUntilCompletedOrDeadline( + Deadline deadline) { + PollNexusOperationExecutionInput pollInput = + new PollNexusOperationExecutionInput( + operationId, + runId, + NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED, + deadline); + CompletableFuture pollFuture; + try { + pollFuture = interceptor.pollNexusOperationExecutionAsync(pollInput); + } catch (Throwable t) { + pollFuture = new CompletableFuture<>(); + pollFuture.completeExceptionally(t); + } + return pollFuture.thenCompose( + out -> { + if (out.getWaitStage() == NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED) { + return CompletableFuture.completedFuture(out); + } + return pollAsyncUntilCompletedOrDeadline(deadline); + }); + } + + private static CompletionException mapAsyncException(Throwable e, Deadline deadline) { + Throwable cause = e instanceof CompletionException ? e.getCause() : e; + if (deadline.isExpired() + && cause instanceof StatusRuntimeException + && Status.Code.DEADLINE_EXCEEDED.equals( + ((StatusRuntimeException) cause).getStatus().getCode())) { + return new CompletionException( + new TimeoutException("getResultAsync timed out before the operation completed")); + } + return e instanceof CompletionException ? (CompletionException) e : new CompletionException(e); + } + + private R extractResult( + PollNexusOperationExecutionOutput out, Class resultClass, @Nullable Type resultType) { + Optional failure = out.getFailure(); + if (failure.isPresent()) { + throw dataConverter.failureToException(failure.get()); + } + Optional payload = out.getResult(); + if (!payload.isPresent()) { + return null; + } + return dataConverter.fromPayload( + payload.get(), resultClass, resultType != null ? resultType : resultClass); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootNexusClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootNexusClientInvoker.java new file mode 100644 index 000000000..262fd368f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootNexusClientInvoker.java @@ -0,0 +1,233 @@ +package io.temporal.internal.client; + +import io.temporal.api.sdk.v1.UserMetadata; +import io.temporal.api.workflowservice.v1.CountNexusOperationExecutionsRequest; +import io.temporal.api.workflowservice.v1.CountNexusOperationExecutionsResponse; +import io.temporal.api.workflowservice.v1.DeleteNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeNexusOperationExecutionResponse; +import io.temporal.api.workflowservice.v1.ListNexusOperationExecutionsRequest; +import io.temporal.api.workflowservice.v1.ListNexusOperationExecutionsResponse; +import io.temporal.api.workflowservice.v1.PollNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.PollNexusOperationExecutionResponse; +import io.temporal.api.workflowservice.v1.RequestCancelNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.StartNexusOperationExecutionRequest; +import io.temporal.api.workflowservice.v1.StartNexusOperationExecutionResponse; +import io.temporal.api.workflowservice.v1.TerminateNexusOperationExecutionRequest; +import io.temporal.client.NexusClientOptions; +import io.temporal.client.NexusOperationExecutionDescription; +import io.temporal.client.StartNexusOperationOptions; +import io.temporal.common.Experimental; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.internal.client.external.GenericWorkflowClient; +import io.temporal.internal.common.ProtobufTimeUtils; +import io.temporal.internal.common.WorkflowExecutionUtils; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * Root implementation of {@link NexusClientCallsInterceptor} that converts the SDK's Java DTOs into + * proto requests and delegates the actual gRPC calls to {@link GenericWorkflowClient}. + */ +@Experimental +public class RootNexusClientInvoker implements NexusClientCallsInterceptor { + + private final GenericWorkflowClient genericClient; + private final NexusClientOptions clientOptions; + + public RootNexusClientInvoker( + GenericWorkflowClient genericClient, NexusClientOptions clientOptions) { + this.genericClient = genericClient; + this.clientOptions = clientOptions; + } + + @Override + public StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input) { + StartNexusOperationOptions options = input.getOptions(); + String operationId = options.getId() != null ? options.getId() : UUID.randomUUID().toString(); + StartNexusOperationExecutionRequest.Builder request = + StartNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setIdentity(clientOptions.getIdentity()) + .setRequestId(UUID.randomUUID().toString()) + .setOperationId(operationId) + .setEndpoint(input.getEndpoint()) + .setService(input.getService()) + .setOperation(input.getOperation()); + // Ensure that the headers are lowercase. + input.getHeaders().forEach((k, v) -> request.putNexusHeader(k.toLowerCase(), v)); + + if (options.getScheduleToCloseTimeout() != null) { + request.setScheduleToCloseTimeout( + ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout())); + } + if (options.getScheduleToStartTimeout() != null) { + request.setScheduleToStartTimeout( + ProtobufTimeUtils.toProtoDuration(options.getScheduleToStartTimeout())); + } + if (options.getStartToCloseTimeout() != null) { + request.setStartToCloseTimeout( + ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout())); + } + input.getInput().ifPresent(request::setInput); + if (options.getTypedSearchAttributes() != null) { + request.setSearchAttributes( + io.temporal.internal.common.SearchAttributesUtil.encodeTyped( + options.getTypedSearchAttributes())); + } + if (options.getIdReusePolicy() != null) { + request.setIdReusePolicy(options.getIdReusePolicy()); + } + if (options.getIdConflictPolicy() != null) { + request.setIdConflictPolicy(options.getIdConflictPolicy()); + } + if (options.getSummary() != null) { + UserMetadata metadata = + WorkflowExecutionUtils.makeUserMetaData( + options.getSummary(), /* details= */ null, clientOptions.getDataConverter()); + if (metadata != null) { + request.setUserMetadata(metadata); + } + } + + StartNexusOperationExecutionResponse response = + genericClient.startNexusOperationExecution(request.build()); + return new StartNexusOperationExecutionOutput( + operationId, response.getRunId(), response.getStarted()); + } + + @Override + public DescribeNexusOperationExecutionOutput describeNexusOperationExecution( + DescribeNexusOperationExecutionInput input) { + DescribeNexusOperationExecutionRequest request = buildDescribeRequest(input); + DescribeNexusOperationExecutionResponse response = + genericClient.describeNexusOperationExecution(request); + return new DescribeNexusOperationExecutionOutput( + new NexusOperationExecutionDescription( + response, clientOptions.getDataConverter(), clientOptions.getNamespace())); + } + + private DescribeNexusOperationExecutionRequest buildDescribeRequest( + DescribeNexusOperationExecutionInput input) { + DescribeNexusOperationExecutionRequest.Builder request = + DescribeNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setOperationId(input.getOperationId()) + .setIncludeInput(input.isIncludeInput()) + .setIncludeOutcome(input.isIncludeOutcome()); + input.getRunId().ifPresent(request::setRunId); + return request.build(); + } + + @Override + public PollNexusOperationExecutionOutput pollNexusOperationExecution( + PollNexusOperationExecutionInput input) { + PollNexusOperationExecutionResponse response = + genericClient.pollNexusOperationExecution(buildPollRequest(input), input.getDeadline()); + return toPollOutput(response); + } + + @Override + public CompletableFuture pollNexusOperationExecutionAsync( + PollNexusOperationExecutionInput input) { + return genericClient + .pollNexusOperationExecutionAsync(buildPollRequest(input), input.getDeadline()) + .thenApply(this::toPollOutput); + } + + private PollNexusOperationExecutionRequest buildPollRequest( + PollNexusOperationExecutionInput input) { + PollNexusOperationExecutionRequest.Builder request = + PollNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setOperationId(input.getOperationId()) + .setWaitStage(input.getWaitStage()); + input.getRunId().ifPresent(request::setRunId); + return request.build(); + } + + private PollNexusOperationExecutionOutput toPollOutput( + PollNexusOperationExecutionResponse response) { + return new PollNexusOperationExecutionOutput( + response.getRunId(), + response.getWaitStage(), + response.getOperationToken(), + response.hasResult() ? response.getResult() : null, + response.hasFailure() ? response.getFailure() : null); + } + + @Override + public ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input) { + ListNexusOperationExecutionsRequest.Builder request = + ListNexusOperationExecutionsRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setPageSize(input.getPageSize()); + input.getQuery().ifPresent(request::setQuery); + input.getNextPageToken().ifPresent(request::setNextPageToken); + + ListNexusOperationExecutionsResponse response = + genericClient.listNexusOperationExecutions(request.build()); + return new ListNexusOperationExecutionsOutput( + response.getOperationsList(), response.getNextPageToken()); + } + + @Override + public CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input) { + CountNexusOperationExecutionsRequest.Builder request = + CountNexusOperationExecutionsRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()); + input.getQuery().ifPresent(request::setQuery); + + CountNexusOperationExecutionsResponse response = + genericClient.countNexusOperationExecutions(request.build()); + + java.util.List groups = + new java.util.ArrayList<>(response.getGroupsCount()); + for (CountNexusOperationExecutionsResponse.AggregationGroup g : response.getGroupsList()) { + groups.add( + new CountNexusOperationExecutionsOutput.AggregationGroup( + g.getGroupValuesList(), g.getCount())); + } + return new CountNexusOperationExecutionsOutput(response.getCount(), groups); + } + + @Override + public void requestCancelNexusOperationExecution( + RequestCancelNexusOperationExecutionInput input) { + RequestCancelNexusOperationExecutionRequest.Builder request = + RequestCancelNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setIdentity(clientOptions.getIdentity()) + .setRequestId(UUID.randomUUID().toString()) + .setOperationId(input.getOperationId()); + input.getRunId().ifPresent(request::setRunId); + input.getReason().ifPresent(request::setReason); + genericClient.requestCancelNexusOperationExecution(request.build()); + } + + @Override + public void terminateNexusOperationExecution(TerminateNexusOperationExecutionInput input) { + TerminateNexusOperationExecutionRequest.Builder request = + TerminateNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setIdentity(clientOptions.getIdentity()) + .setRequestId(UUID.randomUUID().toString()) + .setOperationId(input.getOperationId()); + input.getRunId().ifPresent(request::setRunId); + input.getReason().ifPresent(request::setReason); + genericClient.terminateNexusOperationExecution(request.build()); + } + + @Override + public void deleteNexusOperationExecution(DeleteNexusOperationExecutionInput input) { + DeleteNexusOperationExecutionRequest.Builder request = + DeleteNexusOperationExecutionRequest.newBuilder() + .setNamespace(clientOptions.getNamespace()) + .setOperationId(input.getOperationId()); + input.getRunId().ifPresent(request::setRunId); + genericClient.deleteNexusOperationExecution(request.build()); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java index 317c2300b..648955a1c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java @@ -61,6 +61,33 @@ CompletableFuture listWorkflowExecutionsAsync( DescribeWorkflowExecutionResponse describeWorkflowExecution( DescribeWorkflowExecutionRequest request); + StartNexusOperationExecutionResponse startNexusOperationExecution( + @Nonnull StartNexusOperationExecutionRequest request); + + DescribeNexusOperationExecutionResponse describeNexusOperationExecution( + @Nonnull DescribeNexusOperationExecutionRequest request); + + PollNexusOperationExecutionResponse pollNexusOperationExecution( + @Nonnull PollNexusOperationExecutionRequest request, @Nonnull Deadline deadline); + + CompletableFuture pollNexusOperationExecutionAsync( + @Nonnull PollNexusOperationExecutionRequest request, @Nonnull Deadline deadline); + + ListNexusOperationExecutionsResponse listNexusOperationExecutions( + @Nonnull ListNexusOperationExecutionsRequest request); + + CountNexusOperationExecutionsResponse countNexusOperationExecutions( + @Nonnull CountNexusOperationExecutionsRequest request); + + RequestCancelNexusOperationExecutionResponse requestCancelNexusOperationExecution( + @Nonnull RequestCancelNexusOperationExecutionRequest request); + + TerminateNexusOperationExecutionResponse terminateNexusOperationExecution( + @Nonnull TerminateNexusOperationExecutionRequest request); + + DeleteNexusOperationExecutionResponse deleteNexusOperationExecution( + @Nonnull DeleteNexusOperationExecutionRequest request); + @Experimental @Deprecated UpdateWorkerBuildIdCompatibilityResponse updateWorkerBuildIdCompatability( @@ -75,9 +102,6 @@ ExecuteMultiOperationResponse executeMultiOperation( @Experimental StartActivityExecutionResponse startActivity(StartActivityExecutionRequest request); - @Experimental - PollActivityExecutionResponse pollActivity(PollActivityExecutionRequest request); - @Experimental PollActivityExecutionResponse pollActivity( PollActivityExecutionRequest request, @Nonnull Deadline deadline); @@ -95,9 +119,6 @@ CompletableFuture pollActivityAsync( @Experimental void terminateActivity(TerminateActivityExecutionRequest request); - @Experimental - ListActivityExecutionsResponse listActivities(ListActivityExecutionsRequest request); - @Experimental CompletableFuture listActivitiesAsync( ListActivityExecutionsRequest request); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java index 58ad1e8f1..23f6abd98 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java @@ -309,6 +309,122 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution( grpcRetryerOptions); } + // TODO -- EVAN -- START + @Override + public StartNexusOperationExecutionResponse startNexusOperationExecution( + @Nonnull StartNexusOperationExecutionRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .startNexusOperationExecution(request), + grpcRetryerOptions); + } + + @Override + public DescribeNexusOperationExecutionResponse describeNexusOperationExecution( + @Nonnull DescribeNexusOperationExecutionRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .describeNexusOperationExecution(request), + grpcRetryerOptions); + } + + @Override + public PollNexusOperationExecutionResponse pollNexusOperationExecution( + @Nonnull PollNexusOperationExecutionRequest request, @Nonnull Deadline deadline) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true) + .withDeadline(deadline) + .pollNexusOperationExecution(request), + new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline)); + } + + @Override + public CompletableFuture pollNexusOperationExecutionAsync( + @Nonnull PollNexusOperationExecutionRequest request, @Nonnull Deadline deadline) { + return grpcRetryer.retryWithResultAsync( + asyncThrottlerExecutor, + () -> + toCompletableFuture( + service + .futureStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true) + .withDeadline(deadline) + .pollNexusOperationExecution(request)), + new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline)); + } + + @Override + public ListNexusOperationExecutionsResponse listNexusOperationExecutions( + @Nonnull ListNexusOperationExecutionsRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .listNexusOperationExecutions(request), + grpcRetryerOptions); + } + + @Override + public CountNexusOperationExecutionsResponse countNexusOperationExecutions( + @Nonnull CountNexusOperationExecutionsRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .countNexusOperationExecutions(request), + grpcRetryerOptions); + } + + @Override + public RequestCancelNexusOperationExecutionResponse requestCancelNexusOperationExecution( + @Nonnull RequestCancelNexusOperationExecutionRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .requestCancelNexusOperationExecution(request), + grpcRetryerOptions); + } + + @Override + public TerminateNexusOperationExecutionResponse terminateNexusOperationExecution( + @Nonnull TerminateNexusOperationExecutionRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .terminateNexusOperationExecution(request), + grpcRetryerOptions); + } + + @Override + public DeleteNexusOperationExecutionResponse deleteNexusOperationExecution( + @Nonnull DeleteNexusOperationExecutionRequest request) { + return grpcRetryer.retryWithResult( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .deleteNexusOperationExecution(request), + grpcRetryerOptions); + } + + // TODO -- EVAN -- END private static CompletableFuture toCompletableFuture( ListenableFuture listenableFuture) { CompletableFuture result = new CompletableFuture<>(); @@ -440,18 +556,6 @@ public StartActivityExecutionResponse startActivity(StartActivityExecutionReques grpcRetryerOptions); } - @Override - public PollActivityExecutionResponse pollActivity(PollActivityExecutionRequest request) { - return grpcRetryer.retryWithResult( - () -> - service - .blockingStub() - .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) - .withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true) - .pollActivityExecution(request), - new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, null)); - } - @Override public PollActivityExecutionResponse pollActivity( PollActivityExecutionRequest request, @Nonnull Deadline deadline) { @@ -516,17 +620,6 @@ public void terminateActivity(TerminateActivityExecutionRequest request) { grpcRetryerOptions); } - @Override - public ListActivityExecutionsResponse listActivities(ListActivityExecutionsRequest request) { - return grpcRetryer.retryWithResult( - () -> - service - .blockingStub() - .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) - .listActivityExecutions(request), - grpcRetryerOptions); - } - @Override public CompletableFuture listActivitiesAsync( ListActivityExecutionsRequest request) { diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientInterceptorChainTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientInterceptorChainTest.java new file mode 100644 index 000000000..fc61e0b56 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientInterceptorChainTest.java @@ -0,0 +1,119 @@ +package io.temporal.client.nexus; + +import static org.junit.Assume.assumeTrue; + +import io.temporal.client.NexusClient; +import io.temporal.client.NexusClientImpl; +import io.temporal.client.NexusClientOptions; +import io.temporal.client.NexusOperationExecutionCount; +import io.temporal.common.interceptors.NexusClientCallsInterceptor; +import io.temporal.common.interceptors.NexusClientCallsInterceptorBase; +import io.temporal.common.interceptors.NexusClientInterceptor; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestWorkflows; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Verifies that user-registered {@link NexusClientInterceptor}s are wrapped around the root invoker + * in registration order (last registered = outermost), and that every per-call operation passes + * through every interceptor. + */ +public class NexusClientInterceptorChainTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(PlaceholderWorkflowImpl.class) + .setTestTimeoutSeconds(60) + .build(); + + @BeforeClass + public static void requireExternalService() { + // The time-skipping test server does not implement standalone Nexus operation RPCs. + assumeTrue( + "standalone Nexus operations require a real server", + SDKTestWorkflowRule.useExternalService); + } + + @Test + public void registeredInterceptorsAreCalledInOrder() { + List calls = Collections.synchronizedList(new ArrayList<>()); + NexusClientInterceptor first = next -> new RecordingCallsInterceptor("first", next, calls); + NexusClientInterceptor second = next -> new RecordingCallsInterceptor("second", next, calls); + + NexusClient client = + NexusClientImpl.newInstance( + testWorkflowRule.getWorkflowServiceStubs(), + NexusClientOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setInterceptors(Arrays.asList(first, second)) + .build()); + + // Stream is lazy; consume it to force a single page fetch through the interceptor chain. + long ignoredListCount = client.listNexusOperationExecutions(null).count(); + NexusOperationExecutionCount ignoredCount = client.countNexusOperationExecutions(null); + Assert.assertNotNull(ignoredCount); + Assert.assertTrue(ignoredListCount >= 0); + + // [first, second] -> second wraps first wraps root. + // A call enters second, descends to first, then root, returns through first then second. + Assert.assertEquals( + Arrays.asList( + "second:list:before", + "first:list:before", + "first:list:after", + "second:list:after", + "second:count:before", + "first:count:before", + "first:count:after", + "second:count:after"), + calls); + } + + static class RecordingCallsInterceptor extends NexusClientCallsInterceptorBase { + private final String name; + private final List calls; + + RecordingCallsInterceptor(String name, NexusClientCallsInterceptor next, List calls) { + super(next); + this.name = name; + this.calls = calls; + } + + @Override + public ListNexusOperationExecutionsOutput listNexusOperationExecutions( + ListNexusOperationExecutionsInput input) { + calls.add(name + ":list:before"); + try { + return super.listNexusOperationExecutions(input); + } finally { + calls.add(name + ":list:after"); + } + } + + @Override + public CountNexusOperationExecutionsOutput countNexusOperationExecutions( + CountNexusOperationExecutionsInput input) { + calls.add(name + ":count:before"); + try { + return super.countNexusOperationExecutions(input); + } finally { + calls.add(name + ":count:after"); + } + } + } + + public static class PlaceholderWorkflowImpl implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + return input; + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientTest.java new file mode 100644 index 000000000..4cda00056 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusClientTest.java @@ -0,0 +1,168 @@ +package io.temporal.client.nexus; + +import static org.junit.Assume.assumeTrue; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.client.NexusClient; +import io.temporal.client.NexusOperationExecutionCount; +import io.temporal.client.NexusOperationExecutionMetadata; +import io.temporal.client.StartNexusOperationOptions; +import io.temporal.client.UntypedNexusOperationHandle; +import io.temporal.client.UntypedNexusServiceClient; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +public class NexusClientTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(NexusClientTest.PlaceholderWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @BeforeClass + public static void requireExternalService() { + // The time-skipping test server does not implement standalone Nexus operation RPCs. + assumeTrue( + "standalone Nexus operations require a real server", + SDKTestWorkflowRule.useExternalService); + } + + @Test + public void listNexusOperationExecutions() { + NexusClient client = testWorkflowRule.getNexusClient(); + + // Materialize the lazy stream to force at least one page fetch and ensure no exceptions. + long visited = client.listNexusOperationExecutions(null).count(); + + Assert.assertTrue("expected a non-negative count of listed operations", visited >= 0); + } + + @Test + public void countNexusOperationExecutions() { + // Just run a basic test to see if it works + countNexusOperations(); + } + + public long countNexusOperations() { + NexusClient client = testWorkflowRule.getNexusClient(); + + NexusOperationExecutionCount output = client.countNexusOperationExecutions(null); + + Assert.assertNotNull(output); + Assert.assertTrue(output.getCount() >= 0); + Assert.assertNotNull(output.getGroups()); + + return output.getCount(); + } + + @Test + public void runStandaloneNexusOperation() throws Exception { + TestNexusServiceImpl.received = new java.util.concurrent.CompletableFuture<>(); + TestNexusServiceImpl.invocationCount.set(0); + + Endpoint endpoint = testWorkflowRule.getNexusEndpoint(); + String inputValue = "ping-" + UUID.randomUUID(); + NexusClient client = testWorkflowRule.getNexusClient(); + + UntypedNexusServiceClient svcClient = + client.newUntypedNexusServiceClient( + endpoint.getSpec().getName(), + TestNexusServices.TestNexusService1.class.getSimpleName()); + StartNexusOperationOptions opts = + StartNexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(30)) + .build(); + UntypedNexusOperationHandle handle = svcClient.start("operation", opts, inputValue); + String operationId = handle.getNexusOperationId(); + + // Sync handler: wait for the input to land in the test side-channel; that's how we + // know the operation actually completed on the worker. + String observed; + try { + observed = TestNexusServiceImpl.received.get(60, TimeUnit.SECONDS); + } catch (java.util.concurrent.TimeoutException e) { + Assert.fail( + "Nexus handler was never invoked within 60s. invocationCount=" + + TestNexusServiceImpl.invocationCount.get()); + throw new AssertionError("unreachable"); + } + Assert.assertEquals( + "expected the Nexus handler to receive the same input we sent", inputValue, observed); + + // Poll the list until our operationId appears. This also tests that the list operation + // works correctly. + NexusOperationExecutionMetadata listed = + waitForListedOperation(client, operationId, Duration.ofSeconds(15)); + Assert.assertNotNull( + "expected operationId " + operationId + " to appear in listNexusOperationExecutions", + listed); + Assert.assertEquals(operationId, listed.getOperationId()); + Assert.assertEquals(endpoint.getSpec().getName(), listed.getEndpoint()); + Assert.assertEquals( + TestNexusServices.TestNexusService1.class.getSimpleName(), listed.getService()); + Assert.assertEquals("operation", listed.getOperation()); + + // We know count should be at least 1. + Assert.assertTrue(countNexusOperations() >= 1); + } + + private NexusOperationExecutionMetadata waitForListedOperation( + NexusClient client, String operationId, Duration timeout) throws InterruptedException { + long deadlineNanos = System.nanoTime() + timeout.toNanos(); + while (System.nanoTime() < deadlineNanos) { + NexusOperationExecutionMetadata match = + client + .listNexusOperationExecutions(null) + .filter(m -> operationId.equals(m.getOperationId())) + .findFirst() + .orElse(null); + if (match != null) { + return match; + } + Thread.sleep(500); + } + return null; + } + + public static class PlaceholderWorkflowImpl implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + return input; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class TestNexusServiceImpl { + // CompletableFuture (not BlockingQueue) so we can record a null input — the worker may + // legitimately deliver a null payload, and we want a clean assertion failure instead of a + // NullPointerException-driven retry storm. Reassigned per test in a @Before-style reset. + static volatile java.util.concurrent.CompletableFuture received = + new java.util.concurrent.CompletableFuture<>(); + static final java.util.concurrent.atomic.AtomicInteger invocationCount = + new java.util.concurrent.atomic.AtomicInteger(); + + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (context, details, input) -> { + invocationCount.incrementAndGet(); + // complete() ignores subsequent calls, so the first delivered input wins. + received.complete(input); + return "echo:" + (input == null ? "" : input); + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusOperationHandleTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusOperationHandleTest.java new file mode 100644 index 000000000..95eb9e4ca --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusOperationHandleTest.java @@ -0,0 +1,263 @@ +package io.temporal.client.nexus; + +import static org.junit.Assume.assumeTrue; + +import io.nexusrpc.OperationException; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.client.NexusClient; +import io.temporal.client.NexusOperationExecutionDescription; +import io.temporal.client.NexusOperationHandle; +import io.temporal.client.StartNexusOperationOptions; +import io.temporal.client.UntypedNexusOperationHandle; +import io.temporal.client.UntypedNexusServiceClient; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.UUID; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@link UntypedNexusOperationHandle} per-execution lifecycle methods returned by {@link + * NexusClient#getHandle(String)}: {@code describe()}, {@code cancel()}/{@code cancel(reason)}, and + * {@code terminate()}/{@code terminate(reason)}. + */ +public class NexusOperationHandleTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(PlaceholderWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + // Default is 10s; standalone Nexus dispatch + worker poll can take longer. + .setTestTimeoutSeconds(120) + .build(); + + @BeforeClass + public static void requireExternalService() { + // The time-skipping test server does not implement standalone Nexus operation RPCs. + assumeTrue( + "standalone Nexus operations require a real server", + SDKTestWorkflowRule.useExternalService); + } + + @Test + public void describeReturnsDescriptionForStartedOperation() { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle handle = + started.client.getHandle(started.operationId, started.runId); + + NexusOperationExecutionDescription description = handle.describe(); + + Assert.assertNotNull(description); + Assert.assertNotNull(description.getRunId()); + Assert.assertEquals(started.runId, description.getRunId()); + Assert.assertNotNull(description.getRawResponse()); + } + + @Test + public void describeWithoutRunIdTargetsLatest() { + StartedOperation started = startOperation(); + // Handle with no pinned run ID — server should resolve to the latest run. + UntypedNexusOperationHandle handle = started.client.getHandle(started.operationId); + + NexusOperationExecutionDescription description = handle.describe(); + + Assert.assertNotNull(description); + Assert.assertEquals(started.runId, description.getRunId()); + } + + @Test + public void cancelSucceedsForStartedOperation() { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle handle = + started.client.getHandle(started.operationId, started.runId); + + handle.cancel(); + // No exception — server accepted the cancel request. + } + + @Test + public void cancelWithReasonSucceedsForStartedOperation() { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle handle = + started.client.getHandle(started.operationId, started.runId); + + handle.cancel("test-cancel-reason"); + } + + @Test + public void cancelWithNullReasonSucceeds() { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle handle = + started.client.getHandle(started.operationId, started.runId); + + handle.cancel(null); + } + + @Test + public void terminateSucceedsForStartedOperation() { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle handle = + started.client.getHandle(started.operationId, started.runId); + + handle.terminate(); + } + + @Test + public void terminateWithReasonSucceedsForStartedOperation() { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle handle = + started.client.getHandle(started.operationId, started.runId); + + handle.terminate("test-terminate-reason"); + } + + @Test + public void terminateWithNullReasonSucceeds() { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle handle = + started.client.getHandle(started.operationId, started.runId); + + handle.terminate(null); + } + + @Test + public void getResultReturnsTypedResultForSyncOperation() { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle untyped = + started.client.getHandle(started.operationId, started.runId); + + String result = NexusOperationHandle.fromUntyped(untyped, String.class).getResult(); + + Assert.assertNotNull(result); + Assert.assertTrue("expected echo: prefix, got: " + result, result.startsWith("echo:ping-")); + } + + @Test + public void getResultUntypedReturnsResultForSyncOperation() { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle handle = + started.client.getHandle(started.operationId, started.runId); + + String result = handle.getResult(String.class); + + Assert.assertNotNull(result); + Assert.assertTrue(result.startsWith("echo:ping-")); + } + + @Test + public void getResultAsyncReturnsTypedResultForSyncOperation() throws Exception { + StartedOperation started = startOperation(); + UntypedNexusOperationHandle untyped = + started.client.getHandle(started.operationId, started.runId); + + String result = + NexusOperationHandle.fromUntyped(untyped, String.class) + .getResultAsync() + .get(60, java.util.concurrent.TimeUnit.SECONDS); + + Assert.assertNotNull(result); + Assert.assertTrue(result.startsWith("echo:ping-")); + } + + /** Holder for state used to drive a single test against one started operation. */ + private static final class StartedOperation { + final NexusClient client; + final String operationId; + final String runId; + + StartedOperation(NexusClient client, String operationId, String runId) { + this.client = client; + this.operationId = operationId; + this.runId = runId; + } + } + + private StartedOperation startOperation() { + return startOperation(null); + } + + private StartedOperation startOperation(@javax.annotation.Nullable String inputOverride) { + NexusClient client = testWorkflowRule.getNexusClient(); + Endpoint endpoint = testWorkflowRule.getNexusEndpoint(); + String inputValue = + inputOverride != null ? inputOverride : "ping-handle-test-" + UUID.randomUUID(); + + UntypedNexusServiceClient svcClient = + client.newUntypedNexusServiceClient( + endpoint.getSpec().getName(), + TestNexusServices.TestNexusService1.class.getSimpleName()); + StartNexusOperationOptions opts = + StartNexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(30)) + .build(); + UntypedNexusOperationHandle handle = svcClient.start("operation", opts, inputValue); + + Assert.assertNotNull("expected start to return a run ID", handle.getNexusOperationRunId()); + return new StartedOperation( + client, handle.getNexusOperationId(), handle.getNexusOperationRunId()); + } + + public static class PlaceholderWorkflowImpl implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + return input; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class TestNexusServiceImpl { + /** Inputs starting with this prefix make the handler throw, exercising the failure path. */ + static final String FAIL_PREFIX = "FAIL:"; + + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (context, details, input) -> { + if (input != null && input.startsWith(FAIL_PREFIX)) { + // OperationException.failed = definitive failure (no retries) so the caller's + // getResult surfaces the failure instead of timing out. + throw OperationException.failed("intentional failure: " + input); + } + return "echo:" + (input == null ? "" : input); + }); + } + } + + @Test + public void getResultPropagatesOperationFailure() { + StartedOperation started = startOperation(TestNexusServiceImpl.FAIL_PREFIX + "boom"); + UntypedNexusOperationHandle handle = + started.client.getHandle(started.operationId, started.runId); + + try { + handle.getResult(String.class); + Assert.fail("expected getResult to throw because the operation handler failed"); + } catch (RuntimeException e) { + // The DataConverter wraps the proto Failure into a Java exception. Either the message + // carries the handler's reason, or one of the cause links does. + String combined = collectMessages(e); + Assert.assertTrue( + "expected exception chain to mention the handler failure, got: " + combined, + combined.contains("intentional failure")); + } + } + + private static String collectMessages(Throwable t) { + StringBuilder sb = new StringBuilder(); + for (Throwable c = t; c != null; c = c.getCause()) { + sb.append(c.getClass().getSimpleName()).append(":").append(c.getMessage()).append(" | "); + if (c.getCause() == c) { + break; + } + } + return sb.toString(); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusServiceClientTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusServiceClientTest.java new file mode 100644 index 000000000..309d8945c --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/NexusServiceClientTest.java @@ -0,0 +1,188 @@ +package io.temporal.client.nexus; + +import static org.junit.Assume.assumeTrue; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.client.NexusClientOptions; +import io.temporal.client.NexusOperationHandle; +import io.temporal.client.NexusServiceClient; +import io.temporal.client.StartNexusOperationOptions; +import io.temporal.common.SearchAttributeKey; +import io.temporal.common.SearchAttributes; +import io.temporal.common.interceptors.NexusClientCallsInterceptor.StartNexusOperationExecutionInput; +import io.temporal.common.interceptors.NexusClientCallsInterceptorBase; +import io.temporal.common.interceptors.NexusClientInterceptor; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * End-to-end tests for {@link NexusServiceClient}: typed start/execute via {@link + * java.util.function.BiFunction} method references. + */ +public class NexusServiceClientTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(PlaceholderWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .setTestTimeoutSeconds(120) + .build(); + + @BeforeClass + public static void requireExternalService() { + // The time-skipping test server does not implement standalone Nexus operation RPCs. + assumeTrue( + "standalone Nexus operations require a real server", + SDKTestWorkflowRule.useExternalService); + } + + @Test + public void executeReturnsTypedResult() { + NexusServiceClient client = + buildServiceClient(testWorkflowRule.getNexusEndpoint()); + + String result = client.execute(TestNexusServices.TestNexusService1::operation, "hello"); + + Assert.assertEquals("echo:hello", result); + } + + @Test + public void startReturnsTypedHandleAndPollsResult() { + NexusServiceClient client = + buildServiceClient(testWorkflowRule.getNexusEndpoint()); + + NexusOperationHandle handle = + client.start(TestNexusServices.TestNexusService1::operation, "world"); + + Assert.assertNotNull(handle.getNexusOperationId()); + Assert.assertEquals("echo:world", handle.getResult()); + } + + @Test + public void clientSummaryIsForwardedIntoStartInput() { + AtomicReference captured = new AtomicReference<>(); + RuntimeException sentinel = new RuntimeException("captured-by-test"); + + NexusClientInterceptor recordingFactory = + next -> + new NexusClientCallsInterceptorBase(next) { + @Override + public StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input) { + captured.set(input); + throw sentinel; + } + }; + + NexusServiceClient client = + NexusServiceClient.newInstance( + TestNexusServices.TestNexusService1.class, + "summary-test-endpoint", + testWorkflowRule.getWorkflowServiceStubs(), + NexusClientOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setInterceptors(Collections.singletonList(recordingFactory)) + .build()); + + StartNexusOperationOptions startOptions = + StartNexusOperationOptions.newBuilder().setSummary("per-call-summary").build(); + try { + client.start(TestNexusServices.TestNexusService1::operation, "ignored", startOptions); + Assert.fail("expected sentinel to be thrown by recording interceptor"); + } catch (RuntimeException e) { + Assert.assertSame(sentinel, e); + } + + StartNexusOperationExecutionInput input = captured.get(); + Assert.assertNotNull("interceptor should have captured a start input", input); + Assert.assertEquals( + "expected summary to be forwarded to the start input", + "per-call-summary", + input.getOptions().getSummary()); + } + + @Test + public void clientSearchAttributesAreEncodedIntoStartInput() { + SearchAttributeKey customKey = SearchAttributeKey.forKeyword("CustomNexusTestKey"); + SearchAttributes attrs = SearchAttributes.newBuilder().set(customKey, "expected-value").build(); + + AtomicReference captured = new AtomicReference<>(); + RuntimeException sentinel = new RuntimeException("captured-by-test"); + + NexusClientInterceptor recordingFactory = + next -> + new NexusClientCallsInterceptorBase(next) { + @Override + public StartNexusOperationExecutionOutput startNexusOperationExecution( + StartNexusOperationExecutionInput input) { + captured.set(input); + throw sentinel; + } + }; + + NexusServiceClient client = + NexusServiceClient.newInstance( + TestNexusServices.TestNexusService1.class, + "search-attrs-test-endpoint", + testWorkflowRule.getWorkflowServiceStubs(), + NexusClientOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setInterceptors(Collections.singletonList(recordingFactory)) + .build()); + + StartNexusOperationOptions startOptions = + StartNexusOperationOptions.newBuilder().setTypedSearchAttributes(attrs).build(); + try { + client.start(TestNexusServices.TestNexusService1::operation, "ignored", startOptions); + Assert.fail("expected sentinel to be thrown by recording interceptor"); + } catch (RuntimeException e) { + Assert.assertSame(sentinel, e); + } + + StartNexusOperationExecutionInput input = captured.get(); + Assert.assertNotNull("interceptor should have captured a start input", input); + SearchAttributes capturedAttrs = input.getOptions().getTypedSearchAttributes(); + Assert.assertNotNull("expected search attributes to be forwarded", capturedAttrs); + Assert.assertTrue( + "expected the custom keyword to be present", capturedAttrs.containsKey(customKey)); + Assert.assertEquals("expected-value", capturedAttrs.get(customKey)); + } + + private NexusServiceClient buildServiceClient( + Endpoint endpoint) { + return NexusServiceClient.newInstance( + TestNexusServices.TestNexusService1.class, + endpoint.getSpec().getName(), + testWorkflowRule.getWorkflowServiceStubs(), + NexusClientOptions.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .build()); + } + + public static class PlaceholderWorkflowImpl implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + return input; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (context, details, input) -> "echo:" + (input == null ? "" : input)); + } + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index a1cf4e111..ba45f5251 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -621,7 +621,7 @@ public void completeWorkflowTask( public void applyOnConflictOptions(@Nonnull StartWorkflowExecutionRequest request) { update( ctx -> { - OnConflictOptions options = request.getOnConflictOptions(); + io.temporal.api.workflow.v1.OnConflictOptions options = request.getOnConflictOptions(); String requestId = null; List completionCallbacks = null; List links = null; diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java b/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java index 48a00afc4..792981ef4 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java @@ -13,6 +13,8 @@ import io.temporal.api.history.v1.History; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.client.NexusClient; +import io.temporal.client.NexusClientOptions; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.client.WorkflowQueryException; @@ -260,6 +262,18 @@ public Endpoint getNexusEndpoint() { return testWorkflowRule.getNexusEndpoint(); } + /** + * Returns a {@link NexusClient} bound to this rule's namespace and service stubs. Use for tests + * that exercise the standalone Nexus client surface. + */ + public NexusClient getNexusClient() { + return NexusClient.newInstance( + getWorkflowServiceStubs(), + NexusClientOptions.newBuilder() + .setNamespace(getWorkflowClient().getOptions().getNamespace()) + .build()); + } + public Worker getWorker() { return testWorkflowRule.getWorker(); }