From 5fe5ca1d90c7b5ae1016e7427b3f899a8faecd66 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sat, 17 Jan 2026 09:28:18 +0100 Subject: [PATCH 1/3] Add CompletableFuture accessors to ReactiveResponseConsumer Expose response and exchange completion as CompletableFuture for easier non-blocking integration. --- .../reactive/ReactiveResponseConsumer.java | 62 +++++- .../ReactiveCompletableFuturesExample.java | 207 ++++++++++++++++++ 2 files changed, 263 insertions(+), 6 deletions(-) create mode 100644 httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveCompletableFuturesExample.java diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java index 196f7a1d98..029846a9cb 100644 --- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java +++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import org.apache.hc.core5.annotation.Contract; @@ -62,6 +63,8 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer trailers = Collections.synchronizedList(new ArrayList<>()); private final BasicFuture>> responseFuture; + private final CompletableFuture>> responseCompletableFuture; + private final CompletableFuture responseCompletionFuture; private volatile BasicFuture responseCompletion; private volatile HttpResponse informationResponse; @@ -72,6 +75,8 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer(null); + this.responseCompletableFuture = new CompletableFuture<>(); + this.responseCompletionFuture = new CompletableFuture<>(); } /** @@ -82,12 +87,34 @@ public ReactiveResponseConsumer() { */ public ReactiveResponseConsumer(final FutureCallback>> responseCallback) { this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback")); + this.responseCompletableFuture = new CompletableFuture<>(); + this.responseCompletionFuture = new CompletableFuture<>(); } public Future>> getResponseFuture() { return responseFuture; } + /** + * Returns a {@link CompletableFuture} that completes when the response head and body {@link Publisher} + * are available. + * + * @since 5.5 + */ + public CompletableFuture>> getResponseCompletableFuture() { + return responseCompletableFuture; + } + + /** + * Returns a {@link CompletableFuture} that completes when the response exchange is complete + * (end-of-stream reached and trailers processed, if any). + * + * @since 5.5 + */ + public CompletableFuture getResponseCompletionFuture() { + return responseCompletionFuture; + } + /** * Returns the intermediate (1xx) HTTP response if one was received. * @@ -124,7 +151,11 @@ public void consumeResponse( ) { this.entityDetails = entityDetails; this.responseCompletion = new BasicFuture<>(resultCallback); - this.responseFuture.completed(new Message<>(response, reactiveDataConsumer)); + + final Message> message = new Message<>(response, reactiveDataConsumer); + this.responseFuture.completed(message); + this.responseCompletableFuture.complete(message); + if (entityDetails == null) { streamEnd(null); } @@ -139,8 +170,12 @@ public void informationResponse(final HttpResponse response, final HttpContext h public void failed(final Exception cause) { reactiveDataConsumer.failed(cause); responseFuture.failed(cause); - if (responseCompletion != null) { - responseCompletion.failed(cause); + responseCompletableFuture.completeExceptionally(cause); + responseCompletionFuture.completeExceptionally(cause); + + final BasicFuture completion = responseCompletion; + if (completion != null) { + completion.failed(cause); } } @@ -160,15 +195,30 @@ public void streamEnd(final List trailers) { this.trailers.addAll(trailers); } reactiveDataConsumer.streamEnd(trailers); - responseCompletion.completed(null); + responseCompletionFuture.complete(null); + + final BasicFuture completion = responseCompletion; + if (completion != null) { + completion.completed(null); + } } @Override public void releaseResources() { reactiveDataConsumer.releaseResources(); + responseFuture.cancel(); - if (responseCompletion != null) { - responseCompletion.cancel(); + + if (!responseCompletableFuture.isDone()) { + responseCompletableFuture.cancel(true); + } + if (!responseCompletionFuture.isDone()) { + responseCompletionFuture.cancel(true); + } + + final BasicFuture completion = responseCompletion; + if (completion != null) { + completion.cancel(); } } } diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveCompletableFuturesExample.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveCompletableFuturesExample.java new file mode 100644 index 0000000000..9dc7fb5068 --- /dev/null +++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveCompletableFuturesExample.java @@ -0,0 +1,207 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactive.examples; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Observable; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.impl.Http1StreamListener; +import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactive.ReactiveEntityProducer; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; +import org.reactivestreams.Publisher; + +/** + * Java 8 compatible client example using ReactiveResponseConsumer's new CompletableFuture API. + *

+ * Uses: + * - consumer.getResponseCompletableFuture(): response head + body publisher ready + * - consumer.getResponseCompletionFuture(): exchange fully complete + */ +public final class ReactiveCompletableFuturesExample { + + private ReactiveCompletableFuturesExample() { + } + + private static CompletableFuture withTimeout( + final CompletableFuture future, + final ScheduledExecutorService scheduler, + final long timeout, + final TimeUnit unit) { + + final CompletableFuture timeoutFuture = new CompletableFuture<>(); + final java.util.concurrent.ScheduledFuture task = scheduler.schedule( + () -> timeoutFuture.completeExceptionally(new TimeoutException("Timeout after " + timeout + " " + unit)), + timeout, unit); + + final CompletableFuture combined = future.applyToEither(timeoutFuture, t -> t); + combined.whenComplete((v, ex) -> task.cancel(false)); + return combined; + } + + private static CompletableFuture readBodyAsString(final Publisher publisher) { + final CompletableFuture bodyFuture = new CompletableFuture<>(); + + Observable.fromPublisher(publisher) + .map(buf -> { + final byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + return new String(bytes, UTF_8); + }) + .reduce(new StringBuilder(), StringBuilder::append) + .map(StringBuilder::toString) + .subscribe( + bodyFuture::complete, + bodyFuture::completeExceptionally); + + return bodyFuture; + } + + public static void main(final String[] args) throws Exception { + String endpoint = "http://manjaro:8080/echo"; + if (args.length >= 1) { + endpoint = args[0]; + } + + final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap() + .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build()) + .setStreamListener(new Http1StreamListener() { + + @Override + public void onRequestHead(final HttpConnection connection, final HttpRequest request) { + System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request)); + } + + @Override + public void onResponseHead(final HttpConnection connection, final HttpResponse response) { + System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response)); + } + + @Override + public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) { + if (keepAlive) { + System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)"); + } else { + System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)"); + } + } + + }) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + + requester.start(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + final Thread t = new Thread(r, "timeout-scheduler"); + t.setDaemon(true); + return t; + }); + + try { + final URI uri = new URI(endpoint); + + final Random random = new Random(); + final Flowable requestBody = Flowable.range(1, 100) + .map(i -> ByteBuffer.wrap((i + ":" + random.nextDouble() + "\n").getBytes(UTF_8))); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(uri) + .addHeader("X-Demo", "cfutures-java8") + .setEntity(new ReactiveEntityProducer(requestBody, -1, ContentType.TEXT_PLAIN, null)) + .build(); + + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + + requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null); + + final CompletableFuture printedAndDrained = consumer.getResponseCompletableFuture() + .thenCompose((final Message> streamingResponse) -> { + + final HttpResponse head = streamingResponse.getHead(); + final int code = head.getCode(); + + System.out.println(head); + for (final Header header : head.getHeaders()) { + System.out.println(header); + } + System.out.println(); + + return readBodyAsString(streamingResponse.getBody()).thenApply(body -> { + if (body != null && !body.isEmpty()) { + System.out.print(body); + if (!body.endsWith("\n")) { + System.out.println(); + } + } + if (code >= 400) { + System.out.println("Request failed: HTTP " + code + " " + head.getReasonPhrase()); + } + return null; + }); + }); + + final CompletableFuture exchangeDone = consumer.getResponseCompletionFuture(); + + final CompletableFuture both = CompletableFuture.allOf(printedAndDrained, exchangeDone); + withTimeout(both, scheduler, 60, TimeUnit.SECONDS).get(); + + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + + } finally { + scheduler.shutdownNow(); + } + } +} From 0fac29b51cca53a6ab560194713f52dc506d5cc7 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sat, 17 Jan 2026 09:51:14 +0100 Subject: [PATCH 2/3] Add CompletionStage accessors to ReactiveResponseConsumer Expose response and exchange completion as CompletionStage views of existing CompletableFutures. --- .../reactive/ReactiveResponseConsumer.java | 33 ++- .../ReactiveClientCompletionStageExample.java | 237 ++++++++++++++++++ 2 files changed, 266 insertions(+), 4 deletions(-) create mode 100644 httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java index 029846a9cb..38febded72 100644 --- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java +++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; import org.apache.hc.core5.annotation.Contract; @@ -115,6 +116,26 @@ public CompletableFuture getResponseCompletionFuture() { return responseCompletionFuture; } + /** + * Returns a {@link CompletionStage} that completes when the response head and body {@link Publisher} + * are available. + * + * @since 5.5 + */ + public CompletionStage>> getResponseStage() { + return responseCompletableFuture; + } + + /** + * Returns a {@link CompletionStage} that completes when the response exchange is complete + * (end-of-stream reached and trailers processed, if any). + * + * @since 5.5 + */ + public CompletionStage getResponseCompletionStage() { + return responseCompletionFuture; + } + /** * Returns the intermediate (1xx) HTTP response if one was received. * @@ -144,10 +165,10 @@ public List

getTrailers() { @Override public void consumeResponse( - final HttpResponse response, - final EntityDetails entityDetails, - final HttpContext httpContext, - final FutureCallback resultCallback + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext httpContext, + final FutureCallback resultCallback ) { this.entityDetails = entityDetails; this.responseCompletion = new BasicFuture<>(resultCallback); @@ -169,6 +190,8 @@ public void informationResponse(final HttpResponse response, final HttpContext h @Override public void failed(final Exception cause) { reactiveDataConsumer.failed(cause); + + // Complete stage/futures regardless of whether consumeResponse() has been invoked yet. responseFuture.failed(cause); responseCompletableFuture.completeExceptionally(cause); responseCompletionFuture.completeExceptionally(cause); @@ -195,6 +218,8 @@ public void streamEnd(final List trailers) { this.trailers.addAll(trailers); } reactiveDataConsumer.streamEnd(trailers); + + // Complete CF before BasicFuture.completed(...) (it may trigger releaseResources()). responseCompletionFuture.complete(null); final BasicFuture completion = responseCompletion; diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java new file mode 100644 index 0000000000..59a8de3219 --- /dev/null +++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java @@ -0,0 +1,237 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactive.examples; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.net.InetAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Observable; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.impl.Http1StreamListener; +import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactive.ReactiveEntityProducer; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; +import org.reactivestreams.Publisher; + +/** + * Client demo using CompletionStage accessors on ReactiveResponseConsumer (Java 8). + */ +public final class ReactiveClientCompletionStageExample { + + private ReactiveClientCompletionStageExample() { + } + + private static CompletionStage withTimeout( + final CompletableFuture future, + final ScheduledExecutorService scheduler, + final long timeout, + final TimeUnit unit) { + + final CompletableFuture timeoutFuture = new CompletableFuture<>(); + final java.util.concurrent.ScheduledFuture task = scheduler.schedule( + () -> timeoutFuture.completeExceptionally(new TimeoutException("Timeout after " + timeout + " " + unit)), + timeout, unit); + + final CompletableFuture combined = future.applyToEither(timeoutFuture, t -> t); + combined.whenComplete((v, ex) -> task.cancel(false)); + return combined; + } + + private static CompletableFuture readBodyAsString(final Publisher publisher) { + final CompletableFuture bodyFuture = new CompletableFuture<>(); + + Observable.fromPublisher(publisher) + .map(buf -> { + final byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + return new String(bytes, UTF_8); + }) + .reduce(new StringBuilder(), StringBuilder::append) + .map(StringBuilder::toString) + .subscribe( + bodyFuture::complete, + bodyFuture::completeExceptionally); + + return bodyFuture; + } + + private static boolean isLoopbackHost(final String host) { + if (host == null) { + return false; + } + return "localhost".equalsIgnoreCase(host) || "127.0.0.1".equals(host) || "::1".equals(host); + } + + private static URI normalizeAuthority(final URI uri) { + if (!isLoopbackHost(uri.getHost())) { + return uri; + } + try { + final InetAddress local = InetAddress.getLocalHost(); + final String canonical = local.getCanonicalHostName(); + if (canonical != null && !canonical.isEmpty() && !isLoopbackHost(canonical)) { + return new URI(uri.getScheme(), null, canonical, uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment()); + } + final String addr = local.getHostAddress(); + if (addr != null && !addr.isEmpty() && !isLoopbackHost(addr)) { + return new URI(uri.getScheme(), null, addr, uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment()); + } + } catch (final Exception ignore) { + // fall back + } + return uri; + } + + public static void main(final String[] args) throws Exception { + String endpoint = "http://localhost:8080/echo"; + if (args.length >= 1) { + endpoint = args[0]; + } + + final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap() + .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build()) + .setStreamListener(new Http1StreamListener() { + + @Override + public void onRequestHead(final HttpConnection connection, final HttpRequest request) { + System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request)); + } + + @Override + public void onResponseHead(final HttpConnection connection, final HttpResponse response) { + System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response)); + } + + @Override + public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) { + if (keepAlive) { + System.out.println(connection.getRemoteAddress() + + " exchange completed (connection kept alive)"); + } else { + System.out.println(connection.getRemoteAddress() + + " exchange completed (connection closed)"); + } + } + + }) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + + requester.start(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + final Thread t = new Thread(r, "timeout-scheduler"); + t.setDaemon(true); + return t; + }); + + try { + final URI uri = normalizeAuthority(new URI(endpoint)); + + final Random random = new Random(); + final Flowable requestBody = Flowable.range(1, 100) + .map(i -> ByteBuffer.wrap((i + ":" + random.nextDouble() + "\n").getBytes(UTF_8))); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(uri) + .setEntity(new ReactiveEntityProducer(requestBody, -1, ContentType.TEXT_PLAIN, null)) + .build(); + + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null); + + final CompletableFuture printedAndDrained = new CompletableFuture<>(); + + consumer.getResponseStage().whenComplete((msg, ex) -> { + if (ex != null) { + printedAndDrained.completeExceptionally(ex); + return; + } + try { + System.out.println(msg.getHead()); + for (final Header h : msg.getHead().getHeaders()) { + System.out.println(h); + } + System.out.println(); + + readBodyAsString(msg.getBody()).whenComplete((body, ex2) -> { + if (ex2 != null) { + printedAndDrained.completeExceptionally(ex2); + } else { + if (body != null && !body.isEmpty()) { + System.out.print(body); + if (!body.endsWith("\n")) { + System.out.println(); + } + } + printedAndDrained.complete(null); + } + }); + } catch (final RuntimeException e) { + printedAndDrained.completeExceptionally(e); + } + }); + + final CompletableFuture exchangeDone = consumer.getResponseCompletionStage().toCompletableFuture(); + + final CompletableFuture both = CompletableFuture.allOf(printedAndDrained, exchangeDone); + withTimeout(both, scheduler, 60, TimeUnit.SECONDS).toCompletableFuture().get(); + + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + + } finally { + scheduler.shutdownNow(); + } + } +} From 95b07a943e0331f5e4a894df000016d60d20ca56 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sat, 17 Jan 2026 10:07:40 +0100 Subject: [PATCH 3/3] Expose terminal error as a CompletionStage that completes with the failure cause or null on success. --- .../reactive/ReactiveResponseConsumer.java | 39 ++++++++++++++++--- .../ReactiveClientCompletionStageExample.java | 13 ++++++- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java index 38febded72..7e997a0cbc 100644 --- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java +++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; @@ -67,6 +68,12 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer>> responseCompletableFuture; private final CompletableFuture responseCompletionFuture; + /** + * Completes with {@code null} on success and with the terminal {@link Throwable} on failure. + * This future never completes exceptionally. + */ + private final CompletableFuture failureFuture; + private volatile BasicFuture responseCompletion; private volatile HttpResponse informationResponse; private volatile EntityDetails entityDetails; @@ -78,6 +85,7 @@ public ReactiveResponseConsumer() { this.responseFuture = new BasicFuture<>(null); this.responseCompletableFuture = new CompletableFuture<>(); this.responseCompletionFuture = new CompletableFuture<>(); + this.failureFuture = new CompletableFuture<>(); } /** @@ -90,6 +98,7 @@ public ReactiveResponseConsumer(final FutureCallback(Args.notNull(responseCallback, "responseCallback")); this.responseCompletableFuture = new CompletableFuture<>(); this.responseCompletionFuture = new CompletableFuture<>(); + this.failureFuture = new CompletableFuture<>(); } public Future>> getResponseFuture() { @@ -136,6 +145,16 @@ public CompletionStage getResponseCompletionStage() { return responseCompletionFuture; } + /** + * Completes with {@code null} on success and with the terminal {@link Throwable} on failure. + * This stage never completes exceptionally. + * + * @since 5.5 + */ + public CompletionStage getFailureStage() { + return failureFuture; + } + /** * Returns the intermediate (1xx) HTTP response if one was received. * @@ -165,10 +184,10 @@ public List
getTrailers() { @Override public void consumeResponse( - final HttpResponse response, - final EntityDetails entityDetails, - final HttpContext httpContext, - final FutureCallback resultCallback + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext httpContext, + final FutureCallback resultCallback ) { this.entityDetails = entityDetails; this.responseCompletion = new BasicFuture<>(resultCallback); @@ -190,12 +209,13 @@ public void informationResponse(final HttpResponse response, final HttpContext h @Override public void failed(final Exception cause) { reactiveDataConsumer.failed(cause); - - // Complete stage/futures regardless of whether consumeResponse() has been invoked yet. responseFuture.failed(cause); responseCompletableFuture.completeExceptionally(cause); responseCompletionFuture.completeExceptionally(cause); + // Record failure as a normal completion value. + failureFuture.complete(cause); + final BasicFuture completion = responseCompletion; if (completion != null) { completion.failed(cause); @@ -222,6 +242,9 @@ public void streamEnd(final List trailers) { // Complete CF before BasicFuture.completed(...) (it may trigger releaseResources()). responseCompletionFuture.complete(null); + // Success => no failure. + failureFuture.complete(null); + final BasicFuture completion = responseCompletion; if (completion != null) { completion.completed(null); @@ -241,6 +264,10 @@ public void releaseResources() { responseCompletionFuture.cancel(true); } + if (!failureFuture.isDone()) { + failureFuture.complete(new CancellationException()); + } + final BasicFuture completion = responseCompletion; if (completion != null) { completion.cancel(); diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java index 59a8de3219..c2c25a1a79 100644 --- a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java +++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java @@ -39,8 +39,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.core.Observable; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpConnection; @@ -60,6 +58,9 @@ import org.apache.hc.core5.util.Timeout; import org.reactivestreams.Publisher; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Observable; + /** * Client demo using CompletionStage accessors on ReactiveResponseConsumer (Java 8). */ @@ -190,6 +191,14 @@ public void onExchangeComplete(final HttpConnection connection, final boolean ke final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null); + consumer.getFailureStage().whenComplete((t, ex) -> { + if (ex != null) { + ex.printStackTrace(); + } else if (t != null) { + System.out.println("Request failed: " + t); + } + }); + final CompletableFuture printedAndDrained = new CompletableFuture<>(); consumer.getResponseStage().whenComplete((msg, ex) -> {