Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@
public class InternalNexusOperationContext {
private final String namespace;
private final String taskQueue;
private final String endpoint;
private final Scope metricScope;
private final WorkflowClient client;
NexusOperationOutboundCallsInterceptor outboundCalls;
Link startWorkflowResponseLink;

public InternalNexusOperationContext(
String namespace, String taskQueue, Scope metricScope, WorkflowClient client) {
String namespace,
String taskQueue,
String endpoint,
Scope metricScope,
WorkflowClient client) {
this.namespace = namespace;
this.taskQueue = taskQueue;
this.endpoint = endpoint;
this.metricScope = metricScope;
this.client = client;
}
Expand All @@ -39,6 +45,10 @@ public String getNamespace() {
return namespace;
}

public String getEndpoint() {
return endpoint;
}

public void setOutboundInterceptor(NexusOperationOutboundCallsInterceptor outboundCalls) {
this.outboundCalls = outboundCalls;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
class NexusInfoImpl implements NexusOperationInfo {
private final String namespace;
private final String taskQueue;
private final String endpoint;

NexusInfoImpl(String namespace, String taskQueue) {
NexusInfoImpl(String namespace, String taskQueue, String endpoint) {
this.namespace = namespace;
this.taskQueue = taskQueue;
this.endpoint = endpoint;
}

@Override
Expand All @@ -20,4 +22,9 @@ public String getNamespace() {
public String getTaskQueue() {
return taskQueue;
}

@Override
public String getEndpoint() {
return endpoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
}

CurrentNexusOperationContext.set(
new InternalNexusOperationContext(namespace, taskQueue, metricsScope, client));
new InternalNexusOperationContext(
namespace, taskQueue, request.getEndpoint(), metricsScope, client));

switch (request.getVariantCase()) {
case START_OPERATION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public OperationHandler<Object, Object> intercept(
temporalNexusContext.getMetricsScope(),
temporalNexusContext.getWorkflowClient(),
new NexusInfoImpl(
temporalNexusContext.getNamespace(), temporalNexusContext.getTaskQueue())));
temporalNexusContext.getNamespace(),
temporalNexusContext.getTaskQueue(),
temporalNexusContext.getEndpoint())));
return new OperationInterceptorConverter(inboundCallsInterceptor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,10 @@ public interface NexusOperationInfo {
* @return Nexus Task Queue of the worker that is executing the Nexus Operation
*/
String getTaskQueue();

/**
* @return Endpoint that the Nexus request was addressed to before being forwarded to this worker.
* Supported from server v1.30.0.
*/
String getEndpoint();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ public class NexusOperationInfoTest {
public void testOperationHeaders() {
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
String expectedEndpoint = testWorkflowRule.getNexusEndpoint().getSpec().getName();
Assert.assertEquals(
"UnitTest:" + testWorkflowRule.getTaskQueue(),
"UnitTest:" + testWorkflowRule.getTaskQueue() + ":" + expectedEndpoint,
workflowStub.execute(testWorkflowRule.getTaskQueue()));
}

Expand All @@ -47,7 +48,7 @@ public OperationHandler<String, String> operation() {
return OperationHandler.sync(
(context, details, input) -> {
NexusOperationInfo info = Nexus.getOperationContext().getInfo();
return info.getNamespace() + ":" + info.getTaskQueue();
return info.getNamespace() + ":" + info.getTaskQueue() + ":" + info.getEndpoint();
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ private static void scheduleNexusOperation(
.setTaskToken(taskToken.toBytes())
.setRequest(
io.temporal.api.nexus.v1.Request.newBuilder()
.setEndpoint(attr.getEndpoint())
.setScheduledTime(ctx.currentTime())
.setCapabilities(
io.temporal.api.nexus.v1.Request.Capabilities.newBuilder()
Expand Down Expand Up @@ -998,6 +999,7 @@ private static void requestCancelNexusOperation(
.setTaskToken(taskToken.toBytes())
.setRequest(
io.temporal.api.nexus.v1.Request.newBuilder()
.setEndpoint(data.scheduledEvent.getEndpoint())
.putAllHeader(data.scheduledEvent.getNexusHeaderMap())
.setCancelOperation(
CancelOperationRequest.newBuilder()
Expand Down
Loading