diff --git a/NEWS.md b/NEWS.md index d1bdaf37..ad8a50c5 100644 --- a/NEWS.md +++ b/NEWS.md @@ -46,6 +46,7 @@ See [Migration Guide](guides/MIGRATION.md) and [Design Guide](guides/design.md) - fix: detect server-initiated closes on idle pooled connections (#544) - fix: respect recv_timeout during proxy CONNECT handshake - fix: prevent SOCKS5 and HTTP CONNECT tunnels from being pooled (#797) +- fix: auto-release connections to pool when body reading completes (connection leak fix) ### Security diff --git a/src/hackney.erl b/src/hackney.erl index 1804f5f8..9ea0bf5b 100644 --- a/src/hackney.erl +++ b/src/hackney.erl @@ -550,6 +550,7 @@ send_request(ConnPid, {Method, Path, Headers, Body}) when is_pid(ConnPid) -> %%==================================================================== %% @doc Get the full response body. +%% After reading the body, the connection is automatically released back to the pool. -spec body(conn()) -> {ok, binary()} | {error, term()}. body(ConnPid) when is_pid(ConnPid) -> hackney_conn:body(ConnPid). @@ -559,6 +560,8 @@ body(ConnPid, Timeout) when is_pid(ConnPid) -> hackney_conn:body(ConnPid, Timeout). %% @doc Stream the response body in chunks. +%% Returns {ok, Data} for each chunk, done when complete, or {error, Reason}. +%% When done is returned, the connection is automatically released back to the pool. -spec stream_body(conn()) -> {ok, binary()} | done | {error, term()}. stream_body(ConnPid) when is_pid(ConnPid) -> hackney_conn:stream_body(ConnPid). @@ -568,9 +571,9 @@ stream_body(ConnPid) when is_pid(ConnPid) -> %% can't guarantee the socket state after skipping. -spec skip_body(conn()) -> ok | {error, term()}. skip_body(ConnPid) when is_pid(ConnPid) -> - case body(ConnPid) of + case hackney_conn:body(ConnPid) of {ok, _} -> - %% Stop the connection process so pool gets DOWN message and decrements in_use + %% Body was read (connection auto-released to pool), now stop it hackney_conn:stop(ConnPid), ok; {error, Reason} -> @@ -936,15 +939,14 @@ sync_request_with_redirect_body(ConnPid, Method, Path, HeadersList, FinalBody, {ok, Status, RespHeaders} -> case Method of <<"HEAD">> -> - %% HEAD responses have no body - release connection to pool - safe_release_to_pool(ConnPid), + %% HEAD responses have no body - call body() to trigger auto-release + %% (body returns immediately for HEAD with empty response) + _ = hackney_conn:body(ConnPid), {ok, Status, RespHeaders}; _ when WithBody -> case hackney_conn:body(ConnPid) of {ok, RespBody} -> - %% Body read - release connection to pool - %% (connection might already be closed if server closed without Content-Length) - safe_release_to_pool(ConnPid), + %% Body read - connection auto-released to pool {ok, Status, RespHeaders, RespBody}; {error, Reason} -> {error, Reason} @@ -1581,22 +1583,6 @@ proxy_type_for_scheme(_) -> http. hackney:request(Method, URL, Headers, Body, Options)). -include("hackney_methods.hrl"). - -%% @private Safe release to pool - handles the case where connection -%% might have already exited (e.g., server closed without Content-Length). -safe_release_to_pool(ConnPid) when is_pid(ConnPid) -> - case is_process_alive(ConnPid) of - true -> - try - hackney_conn:release_to_pool(ConnPid) - catch - exit:{normal, _} -> ok; - exit:{noproc, _} -> ok - end; - false -> - ok - end. - %% @doc Parse a proxy URL and extract host, port, and optional credentials. %% Supports URLs like: %% - "http://proxy.example.com:8080" diff --git a/src/hackney_conn.erl b/src/hackney_conn.erl index 0e75482c..5f950688 100644 --- a/src/hackney_conn.erl +++ b/src/hackney_conn.erl @@ -637,7 +637,8 @@ connecting(EventType, Event, Data) -> %% State: connected - Ready for requests %%==================================================================== -connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, idle_timeout = Timeout} = Data) -> +connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, + idle_timeout = Timeout, pool_pid = PoolPid} = Data) -> %% Set socket to active mode to receive server close notifications (tcp_closed/ssl_closed) %% This fixes issue #544: stale connections not being detected when server closes idle connections %% Only enable active mode when returning from a completed request cycle (receiving, streaming states) @@ -647,9 +648,20 @@ connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, id {_, false} -> ok; {_, true} -> Transport:setopts(Socket, [{active, once}]) end, + %% Auto-release to pool when body reading is complete + %% This happens when transitioning from receiving state (body fully read) + Data2 = case {PoolPid, should_auto_release(OldState)} of + {undefined, _} -> + Data; + {_, false} -> + Data; + {_, true} -> + %% Transfer ownership back to pool and notify it + auto_release_to_pool(Data) + end, case Timeout of - infinity -> keep_state_and_data; - _ -> {keep_state, Data, [{state_timeout, Timeout, idle_timeout}]} + infinity -> {keep_state, Data2}; + _ -> {keep_state, Data2, [{state_timeout, Timeout, idle_timeout}]} end; connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mon = OldMon, @@ -1930,6 +1942,23 @@ should_enable_active_mode(streaming_once) -> true; should_enable_active_mode(closed) -> true; %% Reconnection from closed state should_enable_active_mode(_) -> false. +%% @private Determine if we should auto-release to pool when entering connected state +%% We release when body reading is complete (coming from receiving state) +%% but NOT from streaming states (user is still actively streaming) +should_auto_release(receiving) -> true; +should_auto_release(_) -> false. + +%% @private Auto-release connection back to pool +%% Transfers ownership to pool and notifies it asynchronously +auto_release_to_pool(#conn_data{pool_pid = PoolPid, owner_mon = OldMon} = Data) -> + %% Transfer ownership to pool + demonitor(OldMon, [flush]), + NewMon = monitor(process, PoolPid), + Data2 = Data#conn_data{owner = PoolPid, owner_mon = NewMon}, + %% Notify pool asynchronously (avoid blocking the state machine) + notify_pool_available(Data2), + Data2. + %% @private Check if socket is healthy (not closed by peer) %% Note: With active mode enabled on connected sockets, we rely on tcp_closed/ssl_closed %% messages to detect server closes. This function now uses peername to verify the diff --git a/test/hackney_pool_h2_tests.erl b/test/hackney_pool_h2_tests.erl new file mode 100644 index 00000000..62e8c21b --- /dev/null +++ b/test/hackney_pool_h2_tests.erl @@ -0,0 +1,168 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of hackney released under the Apache 2 license. +%%% See the NOTICE for more information. +%%% +%%% Copyright (c) 2024-2026 Benoit Chesneau +%%% +%%% @doc Tests for HTTP/2 connection pooling in hackney_pool. + +-module(hackney_pool_h2_tests). + +-include_lib("eunit/include/eunit.hrl"). + +%%==================================================================== +%% Test Setup +%%==================================================================== + +setup() -> + {ok, _} = application:ensure_all_started(hackney), + ok. + +cleanup(_) -> + hackney_pool:unregister_h2_all(), + hackney_conn_sup:stop_all(), + ok. + +%%==================================================================== +%% HTTP/2 Pool Tests +%%==================================================================== + +h2_pool_test_() -> + { + "HTTP/2 pool tests", + { + setup, + fun setup/0, fun cleanup/1, + [ + {"checkout_h2 returns none when no connection", fun test_h2_checkout_none/0}, + {"register_h2 and checkout_h2", fun test_h2_register_checkout/0}, + {"unregister_h2 removes connection", fun test_h2_unregister/0}, + {"connection death cleans h2_connections", fun test_h2_connection_death/0} + ] + } + }. + +test_h2_checkout_none() -> + %% Without any registered H2 connection, checkout should return none + Result = hackney_pool:checkout_h2("test.example.com", 443, hackney_ssl, []), + ?assertEqual(none, Result). + +test_h2_register_checkout() -> + %% Start a dummy process to act as a connection + DummyConn = spawn(fun() -> receive stop -> ok end end), + + %% Register it as an H2 connection + ok = hackney_pool:register_h2("h2test.example.com", 443, hackney_ssl, DummyConn, []), + + %% Wait a bit for the async cast to process + timer:sleep(50), + + %% Checkout should now return the connection + Result = hackney_pool:checkout_h2("h2test.example.com", 443, hackney_ssl, []), + ?assertEqual({ok, DummyConn}, Result), + + %% Cleanup + DummyConn ! stop. + +test_h2_unregister() -> + %% Start a dummy process + DummyConn = spawn(fun() -> receive stop -> ok end end), + + %% Register it + ok = hackney_pool:register_h2("h2unreg.example.com", 443, hackney_ssl, DummyConn, []), + timer:sleep(50), + + %% Verify it's there + ?assertEqual({ok, DummyConn}, hackney_pool:checkout_h2("h2unreg.example.com", 443, hackney_ssl, [])), + + %% Unregister + ok = hackney_pool:unregister_h2(DummyConn, []), + timer:sleep(50), + + %% Should be gone now + ?assertEqual(none, hackney_pool:checkout_h2("h2unreg.example.com", 443, hackney_ssl, [])), + + %% Cleanup + DummyConn ! stop. + +test_h2_connection_death() -> + %% Start a dummy process + DummyConn = spawn(fun() -> receive stop -> ok end end), + + %% Register it + ok = hackney_pool:register_h2("h2death.example.com", 443, hackney_ssl, DummyConn, []), + timer:sleep(50), + + %% Verify it's there + ?assertEqual({ok, DummyConn}, hackney_pool:checkout_h2("h2death.example.com", 443, hackney_ssl, [])), + + %% Kill the process + DummyConn ! stop, + timer:sleep(100), + + %% Pool should receive DOWN message and clean up - checkout returns none + ?assertEqual(none, hackney_pool:checkout_h2("h2death.example.com", 443, hackney_ssl, [])). + +%%==================================================================== +%% Multiplexing Tests +%%==================================================================== + +h2_multiplexing_test_() -> + { + "HTTP/2 multiplexing tests", + { + setup, + fun setup/0, fun cleanup/1, + [ + {"multiple callers get same connection", fun test_h2_multiplexing/0}, + {"different hosts get different connections", fun test_h2_different_hosts/0} + ] + } + }. + +test_h2_multiplexing() -> + %% Start a dummy connection process + DummyConn = spawn(fun() -> receive stop -> ok end end), + + %% Register it + ok = hackney_pool:register_h2("h2mux.example.com", 443, hackney_ssl, DummyConn, []), + timer:sleep(50), + + %% Multiple checkouts should return the same connection (multiplexing) + {ok, Conn1} = hackney_pool:checkout_h2("h2mux.example.com", 443, hackney_ssl, []), + {ok, Conn2} = hackney_pool:checkout_h2("h2mux.example.com", 443, hackney_ssl, []), + {ok, Conn3} = hackney_pool:checkout_h2("h2mux.example.com", 443, hackney_ssl, []), + + %% All should be the same connection (multiplexed) + ?assertEqual(DummyConn, Conn1), + ?assertEqual(DummyConn, Conn2), + ?assertEqual(DummyConn, Conn3), + ?assertEqual(Conn1, Conn2), + ?assertEqual(Conn2, Conn3), + + %% Cleanup + DummyConn ! stop. + +test_h2_different_hosts() -> + %% Start two dummy connections + Conn1 = spawn(fun() -> receive stop -> ok end end), + Conn2 = spawn(fun() -> receive stop -> ok end end), + + %% Register them for different hosts + ok = hackney_pool:register_h2("host1.example.com", 443, hackney_ssl, Conn1, []), + ok = hackney_pool:register_h2("host2.example.com", 443, hackney_ssl, Conn2, []), + timer:sleep(50), + + %% Checkout for host1 should return Conn1 + ?assertEqual({ok, Conn1}, hackney_pool:checkout_h2("host1.example.com", 443, hackney_ssl, [])), + + %% Checkout for host2 should return Conn2 + ?assertEqual({ok, Conn2}, hackney_pool:checkout_h2("host2.example.com", 443, hackney_ssl, [])), + + %% They should be different + ?assertNotEqual(Conn1, Conn2), + + %% Cleanup + Conn1 ! stop, + Conn2 ! stop. diff --git a/test/hackney_pool_integration_tests.erl b/test/hackney_pool_integration_tests.erl new file mode 100644 index 00000000..d0084bbb --- /dev/null +++ b/test/hackney_pool_integration_tests.erl @@ -0,0 +1,199 @@ +%% @doc Integration tests for pool connection release behavior +-module(hackney_pool_integration_tests). +-include_lib("eunit/include/eunit.hrl"). + +-define(PORT, 9877). +-define(POOL, test_pool_integration). + +%% Setup/teardown for integration tests +setup() -> + {ok, _} = application:ensure_all_started(hackney), + {ok, _} = application:ensure_all_started(cowboy), + Dispatch = cowboy_router:compile([{'_', [{"/[...]", test_http_resource, []}]}]), + {ok, _} = cowboy:start_clear(test_pool_int_http, [{port, ?PORT}], #{ + env => #{dispatch => Dispatch} + }), + %% Create a test pool with small limits for easy testing + hackney_pool:start_pool(?POOL, [{max_connections, 10}]), + ok. + +cleanup(_) -> + hackney_pool:stop_pool(?POOL), + cowboy:stop_listener(test_pool_int_http), + ok. + +url(Path) -> + <<"http://localhost:", (integer_to_binary(?PORT))/binary, Path/binary>>. + +%% ============================================================================= +%% Pool Integration Tests +%% ============================================================================= + +pool_integration_test_() -> + {setup, + fun setup/0, + fun cleanup/1, + [ + {"connection released after with_body request", fun test_with_body_release/0}, + {"connection released after manual body read", fun test_manual_body_release/0}, + {"connection released after close", fun test_close_release/0}, + {"multiple requests reuse connections", fun test_connection_reuse_integration/0}, + {"concurrent requests respect pool limits", fun test_concurrent_requests/0}, + {"connection released on error response", fun test_error_response_release/0}, + {"pool stats accurate during requests", fun test_pool_stats_accuracy/0} + ]}. + +%% Test that connection is released after with_body request +test_with_body_release() -> + %% Get initial stats + InitStats = hackney_pool:get_stats(?POOL), + InitFree = proplists:get_value(free_count, InitStats), + InitInUse = proplists:get_value(in_use_count, InitStats), + + %% Make request with with_body + {ok, 200, _Headers, _Body} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + + %% Allow time for async checkin + timer:sleep(50), + + %% Check stats after - should have one more free connection + AfterStats = hackney_pool:get_stats(?POOL), + AfterFree = proplists:get_value(free_count, AfterStats), + AfterInUse = proplists:get_value(in_use_count, AfterStats), + + %% Connection should be returned to pool (free increased or same) + ?assert(AfterFree >= InitFree orelse AfterInUse =< InitInUse), + %% No connections should be in use + ?assertEqual(0, AfterInUse). + +%% Test that connection is released after manual body read +test_manual_body_release() -> + %% Make request without with_body + {ok, 200, _Headers, Ref} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}]), + + %% Check stats during - connection should be in use + DuringStats = hackney_pool:get_stats(?POOL), + DuringInUse = proplists:get_value(in_use_count, DuringStats), + ?assert(DuringInUse >= 1), + + %% Read body + {ok, _Body} = hackney:body(Ref), + + %% Allow time for async checkin + timer:sleep(50), + + %% Check stats after - no connections in use + AfterStats = hackney_pool:get_stats(?POOL), + AfterInUse = proplists:get_value(in_use_count, AfterStats), + ?assertEqual(0, AfterInUse). + +%% Test that connection is released after explicit close +test_close_release() -> + %% Make request without reading body + {ok, 200, _Headers, Ref} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}]), + + %% Check stats during + DuringStats = hackney_pool:get_stats(?POOL), + DuringInUse = proplists:get_value(in_use_count, DuringStats), + ?assert(DuringInUse >= 1), + + %% Close without reading body + ok = hackney:close(Ref), + + %% Allow time for process cleanup + timer:sleep(50), + + %% Check stats after - no connections in use + AfterStats = hackney_pool:get_stats(?POOL), + AfterInUse = proplists:get_value(in_use_count, AfterStats), + ?assertEqual(0, AfterInUse). + +%% Test that connections are reused +test_connection_reuse_integration() -> + %% Make first request + {ok, 200, _, _} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + timer:sleep(50), + + Stats1 = hackney_pool:get_stats(?POOL), + Free1 = proplists:get_value(free_count, Stats1), + + %% Make second request - should reuse connection + {ok, 200, _, _} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + timer:sleep(50), + + Stats2 = hackney_pool:get_stats(?POOL), + Free2 = proplists:get_value(free_count, Stats2), + + %% Should have same number of free connections (connection reused) + ?assertEqual(Free1, Free2). + +%% Test concurrent requests respect pool limits +test_concurrent_requests() -> + %% Start 5 concurrent requests + Self = self(), + NumRequests = 5, + + Pids = [spawn(fun() -> + Result = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + Self ! {done, self(), Result} + end) || _ <- lists:seq(1, NumRequests)], + + %% Collect results + Results = [receive {done, Pid, R} -> R end || Pid <- Pids], + + %% All should succeed + lists:foreach(fun(R) -> + ?assertMatch({ok, 200, _, _}, R) + end, Results), + + %% Allow time for cleanup + timer:sleep(100), + + %% No connections in use after all complete + Stats = hackney_pool:get_stats(?POOL), + InUse = proplists:get_value(in_use_count, Stats), + ?assertEqual(0, InUse). + +%% Test connection released after error response (4xx/5xx) +test_error_response_release() -> + %% Make request to 404 endpoint + {ok, 404, _Headers, Body} = hackney:request(get, url(<<"/not-found">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + ?assertMatch(<<"{\"error\":", _/binary>>, Body), + + timer:sleep(50), + + %% Connection should still be returned to pool + Stats = hackney_pool:get_stats(?POOL), + InUse = proplists:get_value(in_use_count, Stats), + ?assertEqual(0, InUse). + +%% Test pool stats are accurate during request lifecycle +test_pool_stats_accuracy() -> + %% Initial state + Stats0 = hackney_pool:get_stats(?POOL), + InUse0 = proplists:get_value(in_use_count, Stats0), + + %% Start request without with_body + {ok, 200, _Headers, Ref} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}]), + + %% During request - connection in use + Stats1 = hackney_pool:get_stats(?POOL), + InUse1 = proplists:get_value(in_use_count, Stats1), + ?assertEqual(InUse0 + 1, InUse1), + + %% Read body + {ok, _Body} = hackney:body(Ref), + timer:sleep(50), + + %% After body read - connection returned + Stats2 = hackney_pool:get_stats(?POOL), + InUse2 = proplists:get_value(in_use_count, Stats2), + ?assertEqual(InUse0, InUse2).