Skip to content

Commit 8c2bb94

Browse files
committed
Implemented the missing EOS handling in handleResponseBodyResponse by half-closing the data plane rpc.
1 parent 4fb501c commit 8c2bb94

2 files changed

Lines changed: 7 additions & 9 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -849,16 +849,12 @@ private void handleResponseBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.
849849
if (!streamed.getBody().isEmpty()) {
850850
listener.onExternalBody(streamed.getBody());
851851
}
852-
/*
853852
if (streamed.getEndOfStream() || streamed.getEndOfStreamWithoutMessage()) {
854-
// Body stream from ext-proc finished, but we wait for rawCall.onClose to deliver final status.
855-
// The filter would have already sent halfClose on the dataplane rpc in response to a
856-
// ProcessingResponse for a request, with end of stream indicated in that response.
857-
// So it now has to await for onClose() rather than do anything when
858-
// (streamed.getEndOfStream() || streamed.getEndOfStreamWithoutMessage())
859-
// occurs in handleResponseBodyResponse.
853+
if (requestSideClosed.compareAndSet(false, true)) {
854+
super.halfClose();
855+
}
856+
listener.proceedWithClose();
860857
}
861-
*/
862858
}
863859
}
864860
}

xds/src/test/java/io/grpc/xds/ExternalProcessorFilterTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1718,10 +1718,12 @@ public void onNext(ProcessingRequest request) {
17181718
InProcessChannelBuilder.forName(uniqueDataPlaneServerName).directExecutor().build());
17191719

17201720
final CountDownLatch appCloseLatch = new CountDownLatch(1);
1721+
final AtomicReference<Status> capturedStatus = new AtomicReference<>();
17211722
CallOptions callOptions = CallOptions.DEFAULT.withExecutor(MoreExecutors.directExecutor());
17221723
ClientCall<String, String> proxyCall = interceptor.interceptCall(METHOD_SAY_HELLO, callOptions, dataPlaneChannel);
17231724
proxyCall.start(new ClientCall.Listener<String>() {
17241725
@Override public void onClose(Status status, Metadata trailers) {
1726+
capturedStatus.set(status);
17251727
appCloseLatch.countDown();
17261728
}
17271729
}, new Metadata());
@@ -1740,8 +1742,8 @@ public void onNext(ProcessingRequest request) {
17401742

17411743
// Verify app listener notified
17421744
assertThat(appCloseLatch.await(5, TimeUnit.SECONDS)).isTrue();
1745+
assertThat(capturedStatus.get().isOk()).isTrue();
17431746

1744-
proxyCall.cancel("Cleanup", null);
17451747
channelManager.close();
17461748
}
17471749

0 commit comments

Comments
 (0)