From f5211e368c877c1d7da2d0acb89263ac79287049 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 20 Jan 2026 19:58:14 +0100 Subject: [PATCH] fix: auto-release connections to pool when body reading completes Connections are now automatically released to the pool when body reading completes, rather than requiring explicit release calls. This is handled in the connection process itself (hackney_conn) when transitioning from receiving state to connected state. Changes: - hackney_conn: Add auto_release_to_pool in connected(enter, receiving, ...) - hackney_conn: Add should_auto_release/1 and auto_release_to_pool/1 helpers - hackney.erl: Remove safe_release_to_pool (no longer needed) - hackney.erl: For HEAD requests, call body() to trigger auto-release - Test: Add hackney_pool_integration_tests for HTTP/1.1 pool release - Test: Add hackney_pool_h2_tests for HTTP/2 pool behavior HTTP/2 and HTTP/3 connections use multiplexing and are handled differently: - They stay registered in the pool until the connection closes - Pool monitors them and cleans up on DOWN message - Multiple callers share the same connection --- NEWS.md | 1 + src/hackney.erl | 32 ++-- src/hackney_conn.erl | 35 ++++- test/hackney_pool_h2_tests.erl | 168 ++++++++++++++++++++ test/hackney_pool_integration_tests.erl | 199 ++++++++++++++++++++++++ 5 files changed, 409 insertions(+), 26 deletions(-) create mode 100644 test/hackney_pool_h2_tests.erl create mode 100644 test/hackney_pool_integration_tests.erl diff --git a/NEWS.md b/NEWS.md index d1bdaf37..ad8a50c5 100644 --- a/NEWS.md +++ b/NEWS.md @@ -46,6 +46,7 @@ See [Migration Guide](guides/MIGRATION.md) and [Design Guide](guides/design.md) - fix: detect server-initiated closes on idle pooled connections (#544) - fix: respect recv_timeout during proxy CONNECT handshake - fix: prevent SOCKS5 and HTTP CONNECT tunnels from being pooled (#797) +- fix: auto-release connections to pool when body reading completes (connection leak fix) ### Security diff --git a/src/hackney.erl b/src/hackney.erl index 1804f5f8..9ea0bf5b 100644 --- a/src/hackney.erl +++ b/src/hackney.erl @@ -550,6 +550,7 @@ send_request(ConnPid, {Method, Path, Headers, Body}) when is_pid(ConnPid) -> %%==================================================================== %% @doc Get the full response body. +%% After reading the body, the connection is automatically released back to the pool. -spec body(conn()) -> {ok, binary()} | {error, term()}. body(ConnPid) when is_pid(ConnPid) -> hackney_conn:body(ConnPid). @@ -559,6 +560,8 @@ body(ConnPid, Timeout) when is_pid(ConnPid) -> hackney_conn:body(ConnPid, Timeout). %% @doc Stream the response body in chunks. +%% Returns {ok, Data} for each chunk, done when complete, or {error, Reason}. +%% When done is returned, the connection is automatically released back to the pool. -spec stream_body(conn()) -> {ok, binary()} | done | {error, term()}. stream_body(ConnPid) when is_pid(ConnPid) -> hackney_conn:stream_body(ConnPid). @@ -568,9 +571,9 @@ stream_body(ConnPid) when is_pid(ConnPid) -> %% can't guarantee the socket state after skipping. -spec skip_body(conn()) -> ok | {error, term()}. skip_body(ConnPid) when is_pid(ConnPid) -> - case body(ConnPid) of + case hackney_conn:body(ConnPid) of {ok, _} -> - %% Stop the connection process so pool gets DOWN message and decrements in_use + %% Body was read (connection auto-released to pool), now stop it hackney_conn:stop(ConnPid), ok; {error, Reason} -> @@ -936,15 +939,14 @@ sync_request_with_redirect_body(ConnPid, Method, Path, HeadersList, FinalBody, {ok, Status, RespHeaders} -> case Method of <<"HEAD">> -> - %% HEAD responses have no body - release connection to pool - safe_release_to_pool(ConnPid), + %% HEAD responses have no body - call body() to trigger auto-release + %% (body returns immediately for HEAD with empty response) + _ = hackney_conn:body(ConnPid), {ok, Status, RespHeaders}; _ when WithBody -> case hackney_conn:body(ConnPid) of {ok, RespBody} -> - %% Body read - release connection to pool - %% (connection might already be closed if server closed without Content-Length) - safe_release_to_pool(ConnPid), + %% Body read - connection auto-released to pool {ok, Status, RespHeaders, RespBody}; {error, Reason} -> {error, Reason} @@ -1581,22 +1583,6 @@ proxy_type_for_scheme(_) -> http. hackney:request(Method, URL, Headers, Body, Options)). -include("hackney_methods.hrl"). - -%% @private Safe release to pool - handles the case where connection -%% might have already exited (e.g., server closed without Content-Length). -safe_release_to_pool(ConnPid) when is_pid(ConnPid) -> - case is_process_alive(ConnPid) of - true -> - try - hackney_conn:release_to_pool(ConnPid) - catch - exit:{normal, _} -> ok; - exit:{noproc, _} -> ok - end; - false -> - ok - end. - %% @doc Parse a proxy URL and extract host, port, and optional credentials. %% Supports URLs like: %% - "http://proxy.example.com:8080" diff --git a/src/hackney_conn.erl b/src/hackney_conn.erl index 0e75482c..5f950688 100644 --- a/src/hackney_conn.erl +++ b/src/hackney_conn.erl @@ -637,7 +637,8 @@ connecting(EventType, Event, Data) -> %% State: connected - Ready for requests %%==================================================================== -connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, idle_timeout = Timeout} = Data) -> +connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, + idle_timeout = Timeout, pool_pid = PoolPid} = Data) -> %% Set socket to active mode to receive server close notifications (tcp_closed/ssl_closed) %% This fixes issue #544: stale connections not being detected when server closes idle connections %% Only enable active mode when returning from a completed request cycle (receiving, streaming states) @@ -647,9 +648,20 @@ connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, id {_, false} -> ok; {_, true} -> Transport:setopts(Socket, [{active, once}]) end, + %% Auto-release to pool when body reading is complete + %% This happens when transitioning from receiving state (body fully read) + Data2 = case {PoolPid, should_auto_release(OldState)} of + {undefined, _} -> + Data; + {_, false} -> + Data; + {_, true} -> + %% Transfer ownership back to pool and notify it + auto_release_to_pool(Data) + end, case Timeout of - infinity -> keep_state_and_data; - _ -> {keep_state, Data, [{state_timeout, Timeout, idle_timeout}]} + infinity -> {keep_state, Data2}; + _ -> {keep_state, Data2, [{state_timeout, Timeout, idle_timeout}]} end; connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mon = OldMon, @@ -1930,6 +1942,23 @@ should_enable_active_mode(streaming_once) -> true; should_enable_active_mode(closed) -> true; %% Reconnection from closed state should_enable_active_mode(_) -> false. +%% @private Determine if we should auto-release to pool when entering connected state +%% We release when body reading is complete (coming from receiving state) +%% but NOT from streaming states (user is still actively streaming) +should_auto_release(receiving) -> true; +should_auto_release(_) -> false. + +%% @private Auto-release connection back to pool +%% Transfers ownership to pool and notifies it asynchronously +auto_release_to_pool(#conn_data{pool_pid = PoolPid, owner_mon = OldMon} = Data) -> + %% Transfer ownership to pool + demonitor(OldMon, [flush]), + NewMon = monitor(process, PoolPid), + Data2 = Data#conn_data{owner = PoolPid, owner_mon = NewMon}, + %% Notify pool asynchronously (avoid blocking the state machine) + notify_pool_available(Data2), + Data2. + %% @private Check if socket is healthy (not closed by peer) %% Note: With active mode enabled on connected sockets, we rely on tcp_closed/ssl_closed %% messages to detect server closes. This function now uses peername to verify the diff --git a/test/hackney_pool_h2_tests.erl b/test/hackney_pool_h2_tests.erl new file mode 100644 index 00000000..62e8c21b --- /dev/null +++ b/test/hackney_pool_h2_tests.erl @@ -0,0 +1,168 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of hackney released under the Apache 2 license. +%%% See the NOTICE for more information. +%%% +%%% Copyright (c) 2024-2026 Benoit Chesneau +%%% +%%% @doc Tests for HTTP/2 connection pooling in hackney_pool. + +-module(hackney_pool_h2_tests). + +-include_lib("eunit/include/eunit.hrl"). + +%%==================================================================== +%% Test Setup +%%==================================================================== + +setup() -> + {ok, _} = application:ensure_all_started(hackney), + ok. + +cleanup(_) -> + hackney_pool:unregister_h2_all(), + hackney_conn_sup:stop_all(), + ok. + +%%==================================================================== +%% HTTP/2 Pool Tests +%%==================================================================== + +h2_pool_test_() -> + { + "HTTP/2 pool tests", + { + setup, + fun setup/0, fun cleanup/1, + [ + {"checkout_h2 returns none when no connection", fun test_h2_checkout_none/0}, + {"register_h2 and checkout_h2", fun test_h2_register_checkout/0}, + {"unregister_h2 removes connection", fun test_h2_unregister/0}, + {"connection death cleans h2_connections", fun test_h2_connection_death/0} + ] + } + }. + +test_h2_checkout_none() -> + %% Without any registered H2 connection, checkout should return none + Result = hackney_pool:checkout_h2("test.example.com", 443, hackney_ssl, []), + ?assertEqual(none, Result). + +test_h2_register_checkout() -> + %% Start a dummy process to act as a connection + DummyConn = spawn(fun() -> receive stop -> ok end end), + + %% Register it as an H2 connection + ok = hackney_pool:register_h2("h2test.example.com", 443, hackney_ssl, DummyConn, []), + + %% Wait a bit for the async cast to process + timer:sleep(50), + + %% Checkout should now return the connection + Result = hackney_pool:checkout_h2("h2test.example.com", 443, hackney_ssl, []), + ?assertEqual({ok, DummyConn}, Result), + + %% Cleanup + DummyConn ! stop. + +test_h2_unregister() -> + %% Start a dummy process + DummyConn = spawn(fun() -> receive stop -> ok end end), + + %% Register it + ok = hackney_pool:register_h2("h2unreg.example.com", 443, hackney_ssl, DummyConn, []), + timer:sleep(50), + + %% Verify it's there + ?assertEqual({ok, DummyConn}, hackney_pool:checkout_h2("h2unreg.example.com", 443, hackney_ssl, [])), + + %% Unregister + ok = hackney_pool:unregister_h2(DummyConn, []), + timer:sleep(50), + + %% Should be gone now + ?assertEqual(none, hackney_pool:checkout_h2("h2unreg.example.com", 443, hackney_ssl, [])), + + %% Cleanup + DummyConn ! stop. + +test_h2_connection_death() -> + %% Start a dummy process + DummyConn = spawn(fun() -> receive stop -> ok end end), + + %% Register it + ok = hackney_pool:register_h2("h2death.example.com", 443, hackney_ssl, DummyConn, []), + timer:sleep(50), + + %% Verify it's there + ?assertEqual({ok, DummyConn}, hackney_pool:checkout_h2("h2death.example.com", 443, hackney_ssl, [])), + + %% Kill the process + DummyConn ! stop, + timer:sleep(100), + + %% Pool should receive DOWN message and clean up - checkout returns none + ?assertEqual(none, hackney_pool:checkout_h2("h2death.example.com", 443, hackney_ssl, [])). + +%%==================================================================== +%% Multiplexing Tests +%%==================================================================== + +h2_multiplexing_test_() -> + { + "HTTP/2 multiplexing tests", + { + setup, + fun setup/0, fun cleanup/1, + [ + {"multiple callers get same connection", fun test_h2_multiplexing/0}, + {"different hosts get different connections", fun test_h2_different_hosts/0} + ] + } + }. + +test_h2_multiplexing() -> + %% Start a dummy connection process + DummyConn = spawn(fun() -> receive stop -> ok end end), + + %% Register it + ok = hackney_pool:register_h2("h2mux.example.com", 443, hackney_ssl, DummyConn, []), + timer:sleep(50), + + %% Multiple checkouts should return the same connection (multiplexing) + {ok, Conn1} = hackney_pool:checkout_h2("h2mux.example.com", 443, hackney_ssl, []), + {ok, Conn2} = hackney_pool:checkout_h2("h2mux.example.com", 443, hackney_ssl, []), + {ok, Conn3} = hackney_pool:checkout_h2("h2mux.example.com", 443, hackney_ssl, []), + + %% All should be the same connection (multiplexed) + ?assertEqual(DummyConn, Conn1), + ?assertEqual(DummyConn, Conn2), + ?assertEqual(DummyConn, Conn3), + ?assertEqual(Conn1, Conn2), + ?assertEqual(Conn2, Conn3), + + %% Cleanup + DummyConn ! stop. + +test_h2_different_hosts() -> + %% Start two dummy connections + Conn1 = spawn(fun() -> receive stop -> ok end end), + Conn2 = spawn(fun() -> receive stop -> ok end end), + + %% Register them for different hosts + ok = hackney_pool:register_h2("host1.example.com", 443, hackney_ssl, Conn1, []), + ok = hackney_pool:register_h2("host2.example.com", 443, hackney_ssl, Conn2, []), + timer:sleep(50), + + %% Checkout for host1 should return Conn1 + ?assertEqual({ok, Conn1}, hackney_pool:checkout_h2("host1.example.com", 443, hackney_ssl, [])), + + %% Checkout for host2 should return Conn2 + ?assertEqual({ok, Conn2}, hackney_pool:checkout_h2("host2.example.com", 443, hackney_ssl, [])), + + %% They should be different + ?assertNotEqual(Conn1, Conn2), + + %% Cleanup + Conn1 ! stop, + Conn2 ! stop. diff --git a/test/hackney_pool_integration_tests.erl b/test/hackney_pool_integration_tests.erl new file mode 100644 index 00000000..d0084bbb --- /dev/null +++ b/test/hackney_pool_integration_tests.erl @@ -0,0 +1,199 @@ +%% @doc Integration tests for pool connection release behavior +-module(hackney_pool_integration_tests). +-include_lib("eunit/include/eunit.hrl"). + +-define(PORT, 9877). +-define(POOL, test_pool_integration). + +%% Setup/teardown for integration tests +setup() -> + {ok, _} = application:ensure_all_started(hackney), + {ok, _} = application:ensure_all_started(cowboy), + Dispatch = cowboy_router:compile([{'_', [{"/[...]", test_http_resource, []}]}]), + {ok, _} = cowboy:start_clear(test_pool_int_http, [{port, ?PORT}], #{ + env => #{dispatch => Dispatch} + }), + %% Create a test pool with small limits for easy testing + hackney_pool:start_pool(?POOL, [{max_connections, 10}]), + ok. + +cleanup(_) -> + hackney_pool:stop_pool(?POOL), + cowboy:stop_listener(test_pool_int_http), + ok. + +url(Path) -> + <<"http://localhost:", (integer_to_binary(?PORT))/binary, Path/binary>>. + +%% ============================================================================= +%% Pool Integration Tests +%% ============================================================================= + +pool_integration_test_() -> + {setup, + fun setup/0, + fun cleanup/1, + [ + {"connection released after with_body request", fun test_with_body_release/0}, + {"connection released after manual body read", fun test_manual_body_release/0}, + {"connection released after close", fun test_close_release/0}, + {"multiple requests reuse connections", fun test_connection_reuse_integration/0}, + {"concurrent requests respect pool limits", fun test_concurrent_requests/0}, + {"connection released on error response", fun test_error_response_release/0}, + {"pool stats accurate during requests", fun test_pool_stats_accuracy/0} + ]}. + +%% Test that connection is released after with_body request +test_with_body_release() -> + %% Get initial stats + InitStats = hackney_pool:get_stats(?POOL), + InitFree = proplists:get_value(free_count, InitStats), + InitInUse = proplists:get_value(in_use_count, InitStats), + + %% Make request with with_body + {ok, 200, _Headers, _Body} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + + %% Allow time for async checkin + timer:sleep(50), + + %% Check stats after - should have one more free connection + AfterStats = hackney_pool:get_stats(?POOL), + AfterFree = proplists:get_value(free_count, AfterStats), + AfterInUse = proplists:get_value(in_use_count, AfterStats), + + %% Connection should be returned to pool (free increased or same) + ?assert(AfterFree >= InitFree orelse AfterInUse =< InitInUse), + %% No connections should be in use + ?assertEqual(0, AfterInUse). + +%% Test that connection is released after manual body read +test_manual_body_release() -> + %% Make request without with_body + {ok, 200, _Headers, Ref} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}]), + + %% Check stats during - connection should be in use + DuringStats = hackney_pool:get_stats(?POOL), + DuringInUse = proplists:get_value(in_use_count, DuringStats), + ?assert(DuringInUse >= 1), + + %% Read body + {ok, _Body} = hackney:body(Ref), + + %% Allow time for async checkin + timer:sleep(50), + + %% Check stats after - no connections in use + AfterStats = hackney_pool:get_stats(?POOL), + AfterInUse = proplists:get_value(in_use_count, AfterStats), + ?assertEqual(0, AfterInUse). + +%% Test that connection is released after explicit close +test_close_release() -> + %% Make request without reading body + {ok, 200, _Headers, Ref} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}]), + + %% Check stats during + DuringStats = hackney_pool:get_stats(?POOL), + DuringInUse = proplists:get_value(in_use_count, DuringStats), + ?assert(DuringInUse >= 1), + + %% Close without reading body + ok = hackney:close(Ref), + + %% Allow time for process cleanup + timer:sleep(50), + + %% Check stats after - no connections in use + AfterStats = hackney_pool:get_stats(?POOL), + AfterInUse = proplists:get_value(in_use_count, AfterStats), + ?assertEqual(0, AfterInUse). + +%% Test that connections are reused +test_connection_reuse_integration() -> + %% Make first request + {ok, 200, _, _} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + timer:sleep(50), + + Stats1 = hackney_pool:get_stats(?POOL), + Free1 = proplists:get_value(free_count, Stats1), + + %% Make second request - should reuse connection + {ok, 200, _, _} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + timer:sleep(50), + + Stats2 = hackney_pool:get_stats(?POOL), + Free2 = proplists:get_value(free_count, Stats2), + + %% Should have same number of free connections (connection reused) + ?assertEqual(Free1, Free2). + +%% Test concurrent requests respect pool limits +test_concurrent_requests() -> + %% Start 5 concurrent requests + Self = self(), + NumRequests = 5, + + Pids = [spawn(fun() -> + Result = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + Self ! {done, self(), Result} + end) || _ <- lists:seq(1, NumRequests)], + + %% Collect results + Results = [receive {done, Pid, R} -> R end || Pid <- Pids], + + %% All should succeed + lists:foreach(fun(R) -> + ?assertMatch({ok, 200, _, _}, R) + end, Results), + + %% Allow time for cleanup + timer:sleep(100), + + %% No connections in use after all complete + Stats = hackney_pool:get_stats(?POOL), + InUse = proplists:get_value(in_use_count, Stats), + ?assertEqual(0, InUse). + +%% Test connection released after error response (4xx/5xx) +test_error_response_release() -> + %% Make request to 404 endpoint + {ok, 404, _Headers, Body} = hackney:request(get, url(<<"/not-found">>), [], <<>>, + [{pool, ?POOL}, {with_body, true}]), + ?assertMatch(<<"{\"error\":", _/binary>>, Body), + + timer:sleep(50), + + %% Connection should still be returned to pool + Stats = hackney_pool:get_stats(?POOL), + InUse = proplists:get_value(in_use_count, Stats), + ?assertEqual(0, InUse). + +%% Test pool stats are accurate during request lifecycle +test_pool_stats_accuracy() -> + %% Initial state + Stats0 = hackney_pool:get_stats(?POOL), + InUse0 = proplists:get_value(in_use_count, Stats0), + + %% Start request without with_body + {ok, 200, _Headers, Ref} = hackney:request(get, url(<<"/get">>), [], <<>>, + [{pool, ?POOL}]), + + %% During request - connection in use + Stats1 = hackney_pool:get_stats(?POOL), + InUse1 = proplists:get_value(in_use_count, Stats1), + ?assertEqual(InUse0 + 1, InUse1), + + %% Read body + {ok, _Body} = hackney:body(Ref), + timer:sleep(50), + + %% After body read - connection returned + Stats2 = hackney_pool:get_stats(?POOL), + InUse2 = proplists:get_value(in_use_count, Stats2), + ?assertEqual(InUse0, InUse2).