diff --git a/src/hackney.erl b/src/hackney.erl index 216530b9..b0bab955 100644 --- a/src/hackney.erl +++ b/src/hackney.erl @@ -900,13 +900,14 @@ sync_request_with_redirect_body(ConnPid, Method, Path, HeadersList, FinalBody, case Method of <<"HEAD">> -> %% HEAD responses have no body - release connection to pool - hackney_conn:release_to_pool(ConnPid), + safe_release_to_pool(ConnPid), {ok, Status, RespHeaders}; _ when WithBody -> case hackney_conn:body(ConnPid) of {ok, RespBody} -> %% Body read - release connection to pool - hackney_conn:release_to_pool(ConnPid), + %% (connection might already be closed if server closed without Content-Length) + safe_release_to_pool(ConnPid), {ok, Status, RespHeaders, RespBody}; {error, Reason} -> {error, Reason} @@ -1453,6 +1454,21 @@ proxy_type_for_scheme(_) -> http. -include("hackney_methods.hrl"). +%% @private Safe release to pool - handles the case where connection +%% might have already exited (e.g., server closed without Content-Length). +safe_release_to_pool(ConnPid) when is_pid(ConnPid) -> + case is_process_alive(ConnPid) of + true -> + try + hackney_conn:release_to_pool(ConnPid) + catch + exit:{normal, _} -> ok; + exit:{noproc, _} -> ok + end; + false -> + ok + end. + %% @doc Parse a proxy URL and extract host, port, and optional credentials. %% Supports URLs like: %% - "http://proxy.example.com:8080" diff --git a/src/hackney_conn.erl b/src/hackney_conn.erl index 1c0681fd..0dccb2e7 100644 --- a/src/hackney_conn.erl +++ b/src/hackney_conn.erl @@ -616,13 +616,23 @@ connecting(EventType, Event, Data) -> %% State: connected - Ready for requests %%==================================================================== -connected(enter, _OldState, #conn_data{idle_timeout = Timeout} = Data) -> +connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, idle_timeout = Timeout} = Data) -> + %% Set socket to active mode to receive server close notifications (tcp_closed/ssl_closed) + %% This fixes issue #544: stale connections not being detected when server closes idle connections + %% Only enable active mode when returning from a completed request cycle (receiving, streaming states) + %% NOT on initial connection from 'connecting' state - server might send data before we send request + _ = case {Socket, should_enable_active_mode(OldState)} of + {undefined, _} -> ok; + {_, false} -> ok; + {_, true} -> Transport:setopts(Socket, [{active, once}]) + end, case Timeout of infinity -> keep_state_and_data; _ -> {keep_state, Data, [{state_timeout, Timeout, idle_timeout}]} end; -connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mon = OldMon} = Data) -> +connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mon = OldMon, + transport = Transport, socket = Socket} = Data) -> %% Reset owner to pool before notifying, to avoid deadlock %% (pool might call set_owner back, but connection is blocked here) Data2 = case PoolPid of @@ -633,6 +643,12 @@ connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mo NewMon = monitor(process, PoolPid), Data#conn_data{owner = PoolPid, owner_mon = NewMon} end, + %% Enable active mode for close detection now that connection is idle in pool + %% This fixes issue #544: detect server-initiated closes while idle + _ = case Socket of + undefined -> ok; + _ -> Transport:setopts(Socket, [{active, once}]) + end, %% Notify pool that connection is available for reuse (sync) notify_pool_available_sync(Data2), {keep_state, Data2, [{reply, From, ok}]}; @@ -644,10 +660,19 @@ connected({call, From}, {set_owner, NewOwner}, #conn_data{owner_mon = OldMon} = {keep_state, Data#conn_data{owner = NewOwner, owner_mon = NewMon}, [{reply, From, ok}]}; -connected(cast, {set_owner, NewOwner}, #conn_data{owner_mon = OldMon} = Data) -> +connected(cast, {set_owner, NewOwner}, #conn_data{owner_mon = OldMon, pool_pid = PoolPid, + transport = Transport, socket = Socket} = Data) -> %% Async owner update - used by pool during checkin to avoid deadlock demonitor(OldMon, [flush]), NewMon = monitor(process, NewOwner), + %% If new owner is the pool, connection is becoming idle - enable active mode for close detection + %% This fixes issue #544: detect server-initiated closes while idle in pool + _ = case {NewOwner, PoolPid, Socket} of + {PoolPid, PoolPid, Socket} when PoolPid =/= undefined, Socket =/= undefined -> + Transport:setopts(Socket, [{active, once}]); + _ -> + ok + end, {keep_state, Data#conn_data{owner = NewOwner, owner_mon = NewMon}}; connected(state_timeout, idle_timeout, Data) -> @@ -671,11 +696,25 @@ connected({call, From}, verify_socket, #conn_data{transport = Transport, socket connected({call, From}, is_ready, #conn_data{socket = undefined} = Data) -> %% Socket not connected {next_state, closed, Data, [{reply, From, {ok, closed}}]}; -connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket}) -> - %% Combined state and socket check - case check_socket_health(Transport, Socket) of - ok -> {keep_state_and_data, [{reply, From, {ok, connected}}]}; - {error, _} -> {keep_state_and_data, [{reply, From, {ok, closed}}]} +connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket} = Data) -> + %% Check for pending close message first (from active mode) + case has_pending_close(Socket) of + true -> + %% Server closed the connection - transition to closed state + {next_state, closed, Data#conn_data{socket = undefined}, [{reply, From, {ok, closed}}]}; + false -> + %% No close message pending - check socket health + case check_socket_health(Transport, Socket) of + ok -> + %% Socket is healthy - set to passive mode for checkout + %% This ensures the socket is ready for blocking recv operations + _ = Transport:setopts(Socket, [{active, false}]), + %% Flush any data messages that arrived while in active mode + flush_socket_messages(Socket), + {keep_state_and_data, [{reply, From, {ok, connected}}]}; + {error, _} -> + {keep_state_and_data, [{reply, From, {ok, closed}}]} + end end; connected({call, From}, {upgrade_to_ssl, _SslOpts}, #conn_data{transport = hackney_ssl} = _Data) -> @@ -809,6 +848,15 @@ connected(info, {tcp_error, Socket, _Reason}, #conn_data{socket = Socket} = Data connected(info, {ssl_error, Socket, _Reason}, #conn_data{socket = Socket} = Data) -> {next_state, closed, Data#conn_data{socket = undefined}}; +%% Unexpected data received while idle (HTTP/1.1 only - HTTP/2 handled above) +%% This could be trailing data from server or protocol violation - close connection +connected(info, {tcp, Socket, _UnexpectedData}, #conn_data{socket = Socket, protocol = Protocol} = Data) + when Protocol =/= http2 -> + {next_state, closed, Data#conn_data{socket = undefined}}; +connected(info, {ssl, Socket, _UnexpectedData}, #conn_data{socket = Socket, protocol = Protocol} = Data) + when Protocol =/= http2 -> + {next_state, closed, Data#conn_data{socket = undefined}}; + %% HTTP/3 QUIC message handling connected(info, {quic, ConnRef, {stream_headers, StreamId, Headers, _Fin}}, #conn_data{h3_conn = ConnRef, h3_streams = Streams} = Data) -> @@ -852,7 +900,14 @@ connected(EventType, Event, Data) -> %% State: sending - Sending request data %%==================================================================== -sending(enter, connected, _Data) -> +sending(enter, connected, #conn_data{transport = Transport, socket = Socket}) -> + %% Set socket to passive mode for blocking send/recv operations + %% (socket was in active mode while idle in connected state) + %% Note: socket may be undefined for HTTP/3 (QUIC) connections + _ = case Socket of + undefined -> ok; + _ -> Transport:setopts(Socket, [{active, false}]) + end, keep_state_and_data; sending(internal, {send_request, Method, Path, Headers, Body}, Data) -> @@ -895,7 +950,14 @@ sending(EventType, Event, Data) -> %% State: streaming_body - Streaming request body %%==================================================================== -streaming_body(enter, connected, _Data) -> +streaming_body(enter, connected, #conn_data{transport = Transport, socket = Socket}) -> + %% Set socket to passive mode for blocking send/recv operations + %% (socket was in active mode while idle in connected state) + %% Note: socket may be undefined for HTTP/3 (QUIC) connections + _ = case Socket of + undefined -> ok; + _ -> Transport:setopts(Socket, [{active, false}]) + end, keep_state_and_data; streaming_body(internal, {send_headers_only, Method, Path, Headers}, Data) -> @@ -1083,8 +1145,12 @@ receiving(internal, do_recv_response_async, Data) -> receiving({call, From}, body, Data) -> %% Read full body case read_full_body(Data, <<>>) of + {ok, Body, #conn_data{socket = undefined} = NewData} -> + %% Socket was closed during body read (e.g., no Content-Length) + %% Transition to closed state instead of connected + {next_state, closed, NewData, [{reply, From, {ok, Body}}]}; {ok, Body, NewData} -> - %% Return to connected state + %% Socket still valid - return to connected state {next_state, connected, NewData, [{reply, From, {ok, Body}}]}; {error, Reason} -> {next_state, closed, Data, [{reply, From, {error, Reason}}]} @@ -1095,6 +1161,9 @@ receiving({call, From}, stream_body, Data) -> case stream_body_chunk(Data) of {ok, Chunk, NewData} -> {keep_state, NewData, [{reply, From, {ok, Chunk}}]}; + {done, #conn_data{socket = undefined} = NewData} -> + %% Socket was closed during body read - transition to closed state + {next_state, closed, NewData, [{reply, From, done}]}; {done, NewData} -> {next_state, connected, NewData, [{reply, From, done}]}; {error, Reason} -> @@ -1244,14 +1313,20 @@ streaming_once(EventType, Event, Data) -> %% State: closed - Connection terminated %%==================================================================== -closed(enter, _OldState, #conn_data{socket = Socket, transport = Transport} = Data) -> +closed(enter, _OldState, #conn_data{socket = Socket, transport = Transport, pool_pid = PoolPid} = Data) -> %% Close socket if still open case Socket of undefined -> ok; _ -> Transport:close(Socket) end, - %% Could stop here or allow reconnection - {keep_state, Data#conn_data{socket = undefined}}; + %% For pooled connections, stop the process so pool can clean up + %% For non-pooled connections, stay alive to allow reconnection + case PoolPid of + undefined -> + {keep_state, Data#conn_data{socket = undefined}}; + _ -> + {stop, normal, Data#conn_data{socket = undefined}} + end; closed({call, From}, connect, Data) -> %% Allow reconnection from closed state @@ -1663,7 +1738,8 @@ stream_body_chunk(#conn_data{parser = Parser, transport = Transport, socket = So {ok, RecvData} -> stream_body_chunk_result(hackney_http:execute(NewParser, RecvData), Data); {error, closed} -> - {done, Data}; + %% Connection closed by server - mark socket as undefined + {done, Data#conn_data{socket = undefined}}; {error, Reason} -> {error, Reason} end; @@ -1674,7 +1750,8 @@ stream_body_chunk(#conn_data{parser = Parser, transport = Transport, socket = So %% Execute with new data and handle result stream_body_chunk_result(hackney_http:execute(NewParser, RecvData), Data); {error, closed} -> - {done, Data}; + %% Connection closed by server - mark socket as undefined + {done, Data#conn_data{socket = undefined}}; {error, Reason} -> {error, Reason} end; @@ -1706,23 +1783,58 @@ stream_body_chunk_result({error, Reason}, _Data) -> recv_data(#conn_data{transport = Transport, socket = Socket, recv_timeout = Timeout}) -> Transport:recv(Socket, 0, Timeout). +%% @private Determine if we should enable active mode when entering connected state +%% We only want active mode for close detection when the connection is truly idle +%% (returning from a completed request), not on initial connection where the server +%% might send data before we send our request (as in pipelining or some test scenarios). +should_enable_active_mode(receiving) -> true; +should_enable_active_mode(streaming) -> true; +should_enable_active_mode(streaming_once) -> true; +should_enable_active_mode(closed) -> true; %% Reconnection from closed state +should_enable_active_mode(_) -> false. + %% @private Check if socket is healthy (not closed by peer) +%% Note: With active mode enabled on connected sockets, we rely on tcp_closed/ssl_closed +%% messages to detect server closes. This function now uses peername to verify the +%% socket is still valid without conflicting with active mode. check_socket_health(Transport, Socket) -> - %% Use a non-blocking recv with 0 timeout to check for pending close - case Transport:recv(Socket, 0, 0) of - {error, timeout} -> - %% No data pending, socket is healthy + %% Use peername to check if socket is still connected + %% This works regardless of active/passive mode + Module = case Transport of + hackney_ssl -> ssl; + hackney_tcp -> inet; + _ -> inet + end, + case Module:peername(Socket) of + {ok, _} -> + %% Socket is connected ok; - {error, closed} -> - %% Socket is closed - {error, closed}; {error, Reason} -> - %% Some other error - {error, Reason}; - {ok, _ExtraData} -> - %% Unexpected data - could be server closing with data - %% Treat as ok for now - the next request will handle it - ok + %% Socket error (closed, einval, etc.) + {error, Reason} + end. + +%% @private Check for pending close message in mailbox +%% Returns true if a close message is waiting, false otherwise. +has_pending_close(Socket) -> + receive + {tcp_closed, Socket} -> true; + {ssl_closed, Socket} -> true; + {tcp_error, Socket, _} -> true; + {ssl_error, Socket, _} -> true + after 0 -> + false + end. + +%% @private Flush any socket data messages that arrived while in active mode +%% This is called after setting the socket to passive mode. +%% Note: Close messages are checked separately via has_pending_close/1. +flush_socket_messages(Socket) -> + receive + {tcp, Socket, _Data} -> flush_socket_messages(Socket); + {ssl, Socket, _Data} -> flush_socket_messages(Socket) + after 0 -> + ok end. %% @private Notify pool that connection is available for reuse (async) diff --git a/test/hackney_pool_tests.erl b/test/hackney_pool_tests.erl index 987a18fb..9821910a 100644 --- a/test/hackney_pool_tests.erl +++ b/test/hackney_pool_tests.erl @@ -45,7 +45,8 @@ hackney_pool_integration_test_() -> {"checkin resets owner to pool", fun test_checkin_resets_owner/0}, {"prewarm creates connections", fun test_prewarm/0}, {"queue timeout", {timeout, 120, fun test_queue_timeout/0}}, - {"checkout timeout", {timeout, 120, fun test_checkout_timeout/0}} + {"checkout timeout", {timeout, 120, fun test_checkout_timeout/0}}, + {"server close detected when idle (issue #544)", {timeout, 30, fun test_server_close_detected/0}} ]}. setup_unit() -> @@ -350,3 +351,62 @@ test_checkout_timeout() -> ?assertEqual(Error, checkout_timeout) end, hackney_pool:stop_pool(pool_test_timeout). + +%% Test for issue #544: Server closes idle connection, pool should detect it +%% This tests that when a server closes a connection while it's idle in the pool, +%% the connection process receives tcp_closed and terminates, removing itself from pool. +test_server_close_detected() -> + %% Start a simple TCP server that accepts a connection, responds, then closes after delay + Self = self(), + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}, {reuseaddr, true}]), + {ok, ServerPort} = inet:port(ListenSock), + + %% Server process + ServerPid = spawn_link(fun() -> + {ok, ClientSock} = gen_tcp:accept(ListenSock, 5000), + Self ! {server, accepted}, + %% Wait for client to signal ready + receive ready -> ok end, + %% Close from server side - this triggers tcp_closed on client + gen_tcp:close(ClientSock), + Self ! {server, closed}, + gen_tcp:close(ListenSock) + end), + + %% Create a pool and checkout a connection to our test server + ok = hackney_pool:start_pool(test_pool_server_close, [{pool_size, 5}]), + Opts = [{pool, test_pool_server_close}], + + %% Connect to the test server + {ok, PoolInfo, ConnPid} = hackney_pool:checkout("127.0.0.1", ServerPort, hackney_tcp, Opts), + ?assert(is_process_alive(ConnPid)), + + %% Wait for server to accept + receive {server, accepted} -> ok after 5000 -> error(timeout_accept) end, + + %% Check the connection back into the pool + ok = hackney_pool:checkin(PoolInfo, ConnPid), + timer:sleep(50), + + %% Verify connection is in the pool + Stats1 = hackney_pool:get_stats(test_pool_server_close), + FreeCount1 = proplists:get_value(free_count, Stats1), + ?assertEqual(1, FreeCount1), + ?assert(is_process_alive(ConnPid)), + + %% Tell server to close the connection + ServerPid ! ready, + receive {server, closed} -> ok after 5000 -> error(timeout_close) end, + + %% Give time for tcp_closed to be delivered and processed + timer:sleep(150), + + %% Connection process should have terminated (received tcp_closed, transitioned to closed) + ?assertNot(is_process_alive(ConnPid)), + + %% Connection should have been removed from pool + Stats2 = hackney_pool:get_stats(test_pool_server_close), + FreeCount2 = proplists:get_value(free_count, Stats2), + ?assertEqual(0, FreeCount2), + + ok = hackney_pool:stop_pool(test_pool_server_close).