Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ See [Migration Guide](guides/MIGRATION.md) and [Design Guide](guides/design.md)
- fix: detect server-initiated closes on idle pooled connections (#544)
- fix: respect recv_timeout during proxy CONNECT handshake
- fix: prevent SOCKS5 and HTTP CONNECT tunnels from being pooled (#797)
- fix: auto-release connections to pool when body reading completes (connection leak fix)

### Security

Expand Down
32 changes: 9 additions & 23 deletions src/hackney.erl
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ send_request(ConnPid, {Method, Path, Headers, Body}) when is_pid(ConnPid) ->
%%====================================================================

%% @doc Get the full response body.
%% After reading the body, the connection is automatically released back to the pool.
-spec body(conn()) -> {ok, binary()} | {error, term()}.
body(ConnPid) when is_pid(ConnPid) ->
hackney_conn:body(ConnPid).
Expand All @@ -559,6 +560,8 @@ body(ConnPid, Timeout) when is_pid(ConnPid) ->
hackney_conn:body(ConnPid, Timeout).

%% @doc Stream the response body in chunks.
%% Returns {ok, Data} for each chunk, done when complete, or {error, Reason}.
%% When done is returned, the connection is automatically released back to the pool.
-spec stream_body(conn()) -> {ok, binary()} | done | {error, term()}.
stream_body(ConnPid) when is_pid(ConnPid) ->
hackney_conn:stream_body(ConnPid).
Expand All @@ -568,9 +571,9 @@ stream_body(ConnPid) when is_pid(ConnPid) ->
%% can't guarantee the socket state after skipping.
-spec skip_body(conn()) -> ok | {error, term()}.
skip_body(ConnPid) when is_pid(ConnPid) ->
case body(ConnPid) of
case hackney_conn:body(ConnPid) of
{ok, _} ->
%% Stop the connection process so pool gets DOWN message and decrements in_use
%% Body was read (connection auto-released to pool), now stop it
hackney_conn:stop(ConnPid),
ok;
{error, Reason} ->
Expand Down Expand Up @@ -936,15 +939,14 @@ sync_request_with_redirect_body(ConnPid, Method, Path, HeadersList, FinalBody,
{ok, Status, RespHeaders} ->
case Method of
<<"HEAD">> ->
%% HEAD responses have no body - release connection to pool
safe_release_to_pool(ConnPid),
%% HEAD responses have no body - call body() to trigger auto-release
%% (body returns immediately for HEAD with empty response)
_ = hackney_conn:body(ConnPid),
{ok, Status, RespHeaders};
_ when WithBody ->
case hackney_conn:body(ConnPid) of
{ok, RespBody} ->
%% Body read - release connection to pool
%% (connection might already be closed if server closed without Content-Length)
safe_release_to_pool(ConnPid),
%% Body read - connection auto-released to pool
{ok, Status, RespHeaders, RespBody};
{error, Reason} ->
{error, Reason}
Expand Down Expand Up @@ -1581,22 +1583,6 @@ proxy_type_for_scheme(_) -> http.
hackney:request(Method, URL, Headers, Body, Options)).
-include("hackney_methods.hrl").


%% @private Safe release to pool - handles the case where connection
%% might have already exited (e.g., server closed without Content-Length).
safe_release_to_pool(ConnPid) when is_pid(ConnPid) ->
case is_process_alive(ConnPid) of
true ->
try
hackney_conn:release_to_pool(ConnPid)
catch
exit:{normal, _} -> ok;
exit:{noproc, _} -> ok
end;
false ->
ok
end.

%% @doc Parse a proxy URL and extract host, port, and optional credentials.
%% Supports URLs like:
%% - "http://proxy.example.com:8080"
Expand Down
35 changes: 32 additions & 3 deletions src/hackney_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,8 @@ connecting(EventType, Event, Data) ->
%% State: connected - Ready for requests
%%====================================================================

connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, idle_timeout = Timeout} = Data) ->
connected(enter, OldState, #conn_data{transport = Transport, socket = Socket,
idle_timeout = Timeout, pool_pid = PoolPid} = Data) ->
%% Set socket to active mode to receive server close notifications (tcp_closed/ssl_closed)
%% This fixes issue #544: stale connections not being detected when server closes idle connections
%% Only enable active mode when returning from a completed request cycle (receiving, streaming states)
Expand All @@ -647,9 +648,20 @@ connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, id
{_, false} -> ok;
{_, true} -> Transport:setopts(Socket, [{active, once}])
end,
%% Auto-release to pool when body reading is complete
%% This happens when transitioning from receiving state (body fully read)
Data2 = case {PoolPid, should_auto_release(OldState)} of
{undefined, _} ->
Data;
{_, false} ->
Data;
{_, true} ->
%% Transfer ownership back to pool and notify it
auto_release_to_pool(Data)
end,
case Timeout of
infinity -> keep_state_and_data;
_ -> {keep_state, Data, [{state_timeout, Timeout, idle_timeout}]}
infinity -> {keep_state, Data2};
_ -> {keep_state, Data2, [{state_timeout, Timeout, idle_timeout}]}
end;

connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mon = OldMon,
Expand Down Expand Up @@ -1930,6 +1942,23 @@ should_enable_active_mode(streaming_once) -> true;
should_enable_active_mode(closed) -> true; %% Reconnection from closed state
should_enable_active_mode(_) -> false.

%% @private Determine if we should auto-release to pool when entering connected state
%% We release when body reading is complete (coming from receiving state)
%% but NOT from streaming states (user is still actively streaming)
should_auto_release(receiving) -> true;
should_auto_release(_) -> false.

%% @private Auto-release connection back to pool
%% Transfers ownership to pool and notifies it asynchronously
auto_release_to_pool(#conn_data{pool_pid = PoolPid, owner_mon = OldMon} = Data) ->
%% Transfer ownership to pool
demonitor(OldMon, [flush]),
NewMon = monitor(process, PoolPid),
Data2 = Data#conn_data{owner = PoolPid, owner_mon = NewMon},
%% Notify pool asynchronously (avoid blocking the state machine)
notify_pool_available(Data2),
Data2.

%% @private Check if socket is healthy (not closed by peer)
%% Note: With active mode enabled on connected sockets, we rely on tcp_closed/ssl_closed
%% messages to detect server closes. This function now uses peername to verify the
Expand Down
168 changes: 168 additions & 0 deletions test/hackney_pool_h2_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
%%% -*- erlang -*-
%%%
%%% This file is part of hackney released under the Apache 2 license.
%%% See the NOTICE for more information.
%%%
%%% Copyright (c) 2024-2026 Benoit Chesneau
%%%
%%% @doc Tests for HTTP/2 connection pooling in hackney_pool.

-module(hackney_pool_h2_tests).

-include_lib("eunit/include/eunit.hrl").

%%====================================================================
%% Test Setup
%%====================================================================

setup() ->
{ok, _} = application:ensure_all_started(hackney),
ok.

cleanup(_) ->
hackney_pool:unregister_h2_all(),
hackney_conn_sup:stop_all(),
ok.

%%====================================================================
%% HTTP/2 Pool Tests
%%====================================================================

h2_pool_test_() ->
{
"HTTP/2 pool tests",
{
setup,
fun setup/0, fun cleanup/1,
[
{"checkout_h2 returns none when no connection", fun test_h2_checkout_none/0},
{"register_h2 and checkout_h2", fun test_h2_register_checkout/0},
{"unregister_h2 removes connection", fun test_h2_unregister/0},
{"connection death cleans h2_connections", fun test_h2_connection_death/0}
]
}
}.

test_h2_checkout_none() ->
%% Without any registered H2 connection, checkout should return none
Result = hackney_pool:checkout_h2("test.example.com", 443, hackney_ssl, []),
?assertEqual(none, Result).

test_h2_register_checkout() ->
%% Start a dummy process to act as a connection
DummyConn = spawn(fun() -> receive stop -> ok end end),

%% Register it as an H2 connection
ok = hackney_pool:register_h2("h2test.example.com", 443, hackney_ssl, DummyConn, []),

%% Wait a bit for the async cast to process
timer:sleep(50),

%% Checkout should now return the connection
Result = hackney_pool:checkout_h2("h2test.example.com", 443, hackney_ssl, []),
?assertEqual({ok, DummyConn}, Result),

%% Cleanup
DummyConn ! stop.

test_h2_unregister() ->
%% Start a dummy process
DummyConn = spawn(fun() -> receive stop -> ok end end),

%% Register it
ok = hackney_pool:register_h2("h2unreg.example.com", 443, hackney_ssl, DummyConn, []),
timer:sleep(50),

%% Verify it's there
?assertEqual({ok, DummyConn}, hackney_pool:checkout_h2("h2unreg.example.com", 443, hackney_ssl, [])),

%% Unregister
ok = hackney_pool:unregister_h2(DummyConn, []),
timer:sleep(50),

%% Should be gone now
?assertEqual(none, hackney_pool:checkout_h2("h2unreg.example.com", 443, hackney_ssl, [])),

%% Cleanup
DummyConn ! stop.

test_h2_connection_death() ->
%% Start a dummy process
DummyConn = spawn(fun() -> receive stop -> ok end end),

%% Register it
ok = hackney_pool:register_h2("h2death.example.com", 443, hackney_ssl, DummyConn, []),
timer:sleep(50),

%% Verify it's there
?assertEqual({ok, DummyConn}, hackney_pool:checkout_h2("h2death.example.com", 443, hackney_ssl, [])),

%% Kill the process
DummyConn ! stop,
timer:sleep(100),

%% Pool should receive DOWN message and clean up - checkout returns none
?assertEqual(none, hackney_pool:checkout_h2("h2death.example.com", 443, hackney_ssl, [])).

%%====================================================================
%% Multiplexing Tests
%%====================================================================

h2_multiplexing_test_() ->
{
"HTTP/2 multiplexing tests",
{
setup,
fun setup/0, fun cleanup/1,
[
{"multiple callers get same connection", fun test_h2_multiplexing/0},
{"different hosts get different connections", fun test_h2_different_hosts/0}
]
}
}.

test_h2_multiplexing() ->
%% Start a dummy connection process
DummyConn = spawn(fun() -> receive stop -> ok end end),

%% Register it
ok = hackney_pool:register_h2("h2mux.example.com", 443, hackney_ssl, DummyConn, []),
timer:sleep(50),

%% Multiple checkouts should return the same connection (multiplexing)
{ok, Conn1} = hackney_pool:checkout_h2("h2mux.example.com", 443, hackney_ssl, []),
{ok, Conn2} = hackney_pool:checkout_h2("h2mux.example.com", 443, hackney_ssl, []),
{ok, Conn3} = hackney_pool:checkout_h2("h2mux.example.com", 443, hackney_ssl, []),

%% All should be the same connection (multiplexed)
?assertEqual(DummyConn, Conn1),
?assertEqual(DummyConn, Conn2),
?assertEqual(DummyConn, Conn3),
?assertEqual(Conn1, Conn2),
?assertEqual(Conn2, Conn3),

%% Cleanup
DummyConn ! stop.

test_h2_different_hosts() ->
%% Start two dummy connections
Conn1 = spawn(fun() -> receive stop -> ok end end),
Conn2 = spawn(fun() -> receive stop -> ok end end),

%% Register them for different hosts
ok = hackney_pool:register_h2("host1.example.com", 443, hackney_ssl, Conn1, []),
ok = hackney_pool:register_h2("host2.example.com", 443, hackney_ssl, Conn2, []),
timer:sleep(50),

%% Checkout for host1 should return Conn1
?assertEqual({ok, Conn1}, hackney_pool:checkout_h2("host1.example.com", 443, hackney_ssl, [])),

%% Checkout for host2 should return Conn2
?assertEqual({ok, Conn2}, hackney_pool:checkout_h2("host2.example.com", 443, hackney_ssl, [])),

%% They should be different
?assertNotEqual(Conn1, Conn2),

%% Cleanup
Conn1 ! stop,
Conn2 ! stop.
Loading
Loading