Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/hackney.erl
Original file line number Diff line number Diff line change
Expand Up @@ -900,13 +900,14 @@ sync_request_with_redirect_body(ConnPid, Method, Path, HeadersList, FinalBody,
case Method of
<<"HEAD">> ->
%% HEAD responses have no body - release connection to pool
hackney_conn:release_to_pool(ConnPid),
safe_release_to_pool(ConnPid),
{ok, Status, RespHeaders};
_ when WithBody ->
case hackney_conn:body(ConnPid) of
{ok, RespBody} ->
%% Body read - release connection to pool
hackney_conn:release_to_pool(ConnPid),
%% (connection might already be closed if server closed without Content-Length)
safe_release_to_pool(ConnPid),
{ok, Status, RespHeaders, RespBody};
{error, Reason} ->
{error, Reason}
Expand Down Expand Up @@ -1453,6 +1454,21 @@ proxy_type_for_scheme(_) -> http.
-include("hackney_methods.hrl").


%% @private Safe release to pool - handles the case where connection
%% might have already exited (e.g., server closed without Content-Length).
safe_release_to_pool(ConnPid) when is_pid(ConnPid) ->
case is_process_alive(ConnPid) of
true ->
try
hackney_conn:release_to_pool(ConnPid)
catch
exit:{normal, _} -> ok;
exit:{noproc, _} -> ok
end;
false ->
ok
end.

%% @doc Parse a proxy URL and extract host, port, and optional credentials.
%% Supports URLs like:
%% - "http://proxy.example.com:8080"
Expand Down
170 changes: 141 additions & 29 deletions src/hackney_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -616,13 +616,23 @@ connecting(EventType, Event, Data) ->
%% State: connected - Ready for requests
%%====================================================================

connected(enter, _OldState, #conn_data{idle_timeout = Timeout} = Data) ->
connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, idle_timeout = Timeout} = Data) ->
%% Set socket to active mode to receive server close notifications (tcp_closed/ssl_closed)
%% This fixes issue #544: stale connections not being detected when server closes idle connections
%% Only enable active mode when returning from a completed request cycle (receiving, streaming states)
%% NOT on initial connection from 'connecting' state - server might send data before we send request
_ = case {Socket, should_enable_active_mode(OldState)} of
{undefined, _} -> ok;
{_, false} -> ok;
{_, true} -> Transport:setopts(Socket, [{active, once}])
end,
case Timeout of
infinity -> keep_state_and_data;
_ -> {keep_state, Data, [{state_timeout, Timeout, idle_timeout}]}
end;

connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mon = OldMon} = Data) ->
connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mon = OldMon,
transport = Transport, socket = Socket} = Data) ->
%% Reset owner to pool before notifying, to avoid deadlock
%% (pool might call set_owner back, but connection is blocked here)
Data2 = case PoolPid of
Expand All @@ -633,6 +643,12 @@ connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mo
NewMon = monitor(process, PoolPid),
Data#conn_data{owner = PoolPid, owner_mon = NewMon}
end,
%% Enable active mode for close detection now that connection is idle in pool
%% This fixes issue #544: detect server-initiated closes while idle
_ = case Socket of
undefined -> ok;
_ -> Transport:setopts(Socket, [{active, once}])
end,
%% Notify pool that connection is available for reuse (sync)
notify_pool_available_sync(Data2),
{keep_state, Data2, [{reply, From, ok}]};
Expand All @@ -644,10 +660,19 @@ connected({call, From}, {set_owner, NewOwner}, #conn_data{owner_mon = OldMon} =
{keep_state, Data#conn_data{owner = NewOwner, owner_mon = NewMon},
[{reply, From, ok}]};

connected(cast, {set_owner, NewOwner}, #conn_data{owner_mon = OldMon} = Data) ->
connected(cast, {set_owner, NewOwner}, #conn_data{owner_mon = OldMon, pool_pid = PoolPid,
transport = Transport, socket = Socket} = Data) ->
%% Async owner update - used by pool during checkin to avoid deadlock
demonitor(OldMon, [flush]),
NewMon = monitor(process, NewOwner),
%% If new owner is the pool, connection is becoming idle - enable active mode for close detection
%% This fixes issue #544: detect server-initiated closes while idle in pool
_ = case {NewOwner, PoolPid, Socket} of
{PoolPid, PoolPid, Socket} when PoolPid =/= undefined, Socket =/= undefined ->
Transport:setopts(Socket, [{active, once}]);
_ ->
ok
end,
{keep_state, Data#conn_data{owner = NewOwner, owner_mon = NewMon}};

connected(state_timeout, idle_timeout, Data) ->
Expand All @@ -671,11 +696,25 @@ connected({call, From}, verify_socket, #conn_data{transport = Transport, socket
connected({call, From}, is_ready, #conn_data{socket = undefined} = Data) ->
%% Socket not connected
{next_state, closed, Data, [{reply, From, {ok, closed}}]};
connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket}) ->
%% Combined state and socket check
case check_socket_health(Transport, Socket) of
ok -> {keep_state_and_data, [{reply, From, {ok, connected}}]};
{error, _} -> {keep_state_and_data, [{reply, From, {ok, closed}}]}
connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket} = Data) ->
%% Check for pending close message first (from active mode)
case has_pending_close(Socket) of
true ->
%% Server closed the connection - transition to closed state
{next_state, closed, Data#conn_data{socket = undefined}, [{reply, From, {ok, closed}}]};
false ->
%% No close message pending - check socket health
case check_socket_health(Transport, Socket) of
ok ->
%% Socket is healthy - set to passive mode for checkout
%% This ensures the socket is ready for blocking recv operations
_ = Transport:setopts(Socket, [{active, false}]),
%% Flush any data messages that arrived while in active mode
flush_socket_messages(Socket),
{keep_state_and_data, [{reply, From, {ok, connected}}]};
{error, _} ->
{keep_state_and_data, [{reply, From, {ok, closed}}]}
end
end;

connected({call, From}, {upgrade_to_ssl, _SslOpts}, #conn_data{transport = hackney_ssl} = _Data) ->
Expand Down Expand Up @@ -809,6 +848,15 @@ connected(info, {tcp_error, Socket, _Reason}, #conn_data{socket = Socket} = Data
connected(info, {ssl_error, Socket, _Reason}, #conn_data{socket = Socket} = Data) ->
{next_state, closed, Data#conn_data{socket = undefined}};

%% Unexpected data received while idle (HTTP/1.1 only - HTTP/2 handled above)
%% This could be trailing data from server or protocol violation - close connection
connected(info, {tcp, Socket, _UnexpectedData}, #conn_data{socket = Socket, protocol = Protocol} = Data)
when Protocol =/= http2 ->
{next_state, closed, Data#conn_data{socket = undefined}};
connected(info, {ssl, Socket, _UnexpectedData}, #conn_data{socket = Socket, protocol = Protocol} = Data)
when Protocol =/= http2 ->
{next_state, closed, Data#conn_data{socket = undefined}};

%% HTTP/3 QUIC message handling
connected(info, {quic, ConnRef, {stream_headers, StreamId, Headers, _Fin}},
#conn_data{h3_conn = ConnRef, h3_streams = Streams} = Data) ->
Expand Down Expand Up @@ -852,7 +900,14 @@ connected(EventType, Event, Data) ->
%% State: sending - Sending request data
%%====================================================================

sending(enter, connected, _Data) ->
sending(enter, connected, #conn_data{transport = Transport, socket = Socket}) ->
%% Set socket to passive mode for blocking send/recv operations
%% (socket was in active mode while idle in connected state)
%% Note: socket may be undefined for HTTP/3 (QUIC) connections
_ = case Socket of
undefined -> ok;
_ -> Transport:setopts(Socket, [{active, false}])
end,
keep_state_and_data;

sending(internal, {send_request, Method, Path, Headers, Body}, Data) ->
Expand Down Expand Up @@ -895,7 +950,14 @@ sending(EventType, Event, Data) ->
%% State: streaming_body - Streaming request body
%%====================================================================

streaming_body(enter, connected, _Data) ->
streaming_body(enter, connected, #conn_data{transport = Transport, socket = Socket}) ->
%% Set socket to passive mode for blocking send/recv operations
%% (socket was in active mode while idle in connected state)
%% Note: socket may be undefined for HTTP/3 (QUIC) connections
_ = case Socket of
undefined -> ok;
_ -> Transport:setopts(Socket, [{active, false}])
end,
keep_state_and_data;

streaming_body(internal, {send_headers_only, Method, Path, Headers}, Data) ->
Expand Down Expand Up @@ -1083,8 +1145,12 @@ receiving(internal, do_recv_response_async, Data) ->
receiving({call, From}, body, Data) ->
%% Read full body
case read_full_body(Data, <<>>) of
{ok, Body, #conn_data{socket = undefined} = NewData} ->
%% Socket was closed during body read (e.g., no Content-Length)
%% Transition to closed state instead of connected
{next_state, closed, NewData, [{reply, From, {ok, Body}}]};
{ok, Body, NewData} ->
%% Return to connected state
%% Socket still valid - return to connected state
{next_state, connected, NewData, [{reply, From, {ok, Body}}]};
{error, Reason} ->
{next_state, closed, Data, [{reply, From, {error, Reason}}]}
Expand All @@ -1095,6 +1161,9 @@ receiving({call, From}, stream_body, Data) ->
case stream_body_chunk(Data) of
{ok, Chunk, NewData} ->
{keep_state, NewData, [{reply, From, {ok, Chunk}}]};
{done, #conn_data{socket = undefined} = NewData} ->
%% Socket was closed during body read - transition to closed state
{next_state, closed, NewData, [{reply, From, done}]};
{done, NewData} ->
{next_state, connected, NewData, [{reply, From, done}]};
{error, Reason} ->
Expand Down Expand Up @@ -1244,14 +1313,20 @@ streaming_once(EventType, Event, Data) ->
%% State: closed - Connection terminated
%%====================================================================

closed(enter, _OldState, #conn_data{socket = Socket, transport = Transport} = Data) ->
closed(enter, _OldState, #conn_data{socket = Socket, transport = Transport, pool_pid = PoolPid} = Data) ->
%% Close socket if still open
case Socket of
undefined -> ok;
_ -> Transport:close(Socket)
end,
%% Could stop here or allow reconnection
{keep_state, Data#conn_data{socket = undefined}};
%% For pooled connections, stop the process so pool can clean up
%% For non-pooled connections, stay alive to allow reconnection
case PoolPid of
undefined ->
{keep_state, Data#conn_data{socket = undefined}};
_ ->
{stop, normal, Data#conn_data{socket = undefined}}
end;

closed({call, From}, connect, Data) ->
%% Allow reconnection from closed state
Expand Down Expand Up @@ -1663,7 +1738,8 @@ stream_body_chunk(#conn_data{parser = Parser, transport = Transport, socket = So
{ok, RecvData} ->
stream_body_chunk_result(hackney_http:execute(NewParser, RecvData), Data);
{error, closed} ->
{done, Data};
%% Connection closed by server - mark socket as undefined
{done, Data#conn_data{socket = undefined}};
{error, Reason} ->
{error, Reason}
end;
Expand All @@ -1674,7 +1750,8 @@ stream_body_chunk(#conn_data{parser = Parser, transport = Transport, socket = So
%% Execute with new data and handle result
stream_body_chunk_result(hackney_http:execute(NewParser, RecvData), Data);
{error, closed} ->
{done, Data};
%% Connection closed by server - mark socket as undefined
{done, Data#conn_data{socket = undefined}};
{error, Reason} ->
{error, Reason}
end;
Expand Down Expand Up @@ -1706,23 +1783,58 @@ stream_body_chunk_result({error, Reason}, _Data) ->
recv_data(#conn_data{transport = Transport, socket = Socket, recv_timeout = Timeout}) ->
Transport:recv(Socket, 0, Timeout).

%% @private Determine if we should enable active mode when entering connected state
%% We only want active mode for close detection when the connection is truly idle
%% (returning from a completed request), not on initial connection where the server
%% might send data before we send our request (as in pipelining or some test scenarios).
should_enable_active_mode(receiving) -> true;
should_enable_active_mode(streaming) -> true;
should_enable_active_mode(streaming_once) -> true;
should_enable_active_mode(closed) -> true; %% Reconnection from closed state
should_enable_active_mode(_) -> false.

%% @private Check if socket is healthy (not closed by peer)
%% Note: With active mode enabled on connected sockets, we rely on tcp_closed/ssl_closed
%% messages to detect server closes. This function now uses peername to verify the
%% socket is still valid without conflicting with active mode.
check_socket_health(Transport, Socket) ->
%% Use a non-blocking recv with 0 timeout to check for pending close
case Transport:recv(Socket, 0, 0) of
{error, timeout} ->
%% No data pending, socket is healthy
%% Use peername to check if socket is still connected
%% This works regardless of active/passive mode
Module = case Transport of
hackney_ssl -> ssl;
hackney_tcp -> inet;
_ -> inet
end,
case Module:peername(Socket) of
{ok, _} ->
%% Socket is connected
ok;
{error, closed} ->
%% Socket is closed
{error, closed};
{error, Reason} ->
%% Some other error
{error, Reason};
{ok, _ExtraData} ->
%% Unexpected data - could be server closing with data
%% Treat as ok for now - the next request will handle it
ok
%% Socket error (closed, einval, etc.)
{error, Reason}
end.

%% @private Check for pending close message in mailbox
%% Returns true if a close message is waiting, false otherwise.
has_pending_close(Socket) ->
receive
{tcp_closed, Socket} -> true;
{ssl_closed, Socket} -> true;
{tcp_error, Socket, _} -> true;
{ssl_error, Socket, _} -> true
after 0 ->
false
end.

%% @private Flush any socket data messages that arrived while in active mode
%% This is called after setting the socket to passive mode.
%% Note: Close messages are checked separately via has_pending_close/1.
flush_socket_messages(Socket) ->
receive
{tcp, Socket, _Data} -> flush_socket_messages(Socket);
{ssl, Socket, _Data} -> flush_socket_messages(Socket)
after 0 ->
ok
end.

%% @private Notify pool that connection is available for reuse (async)
Expand Down
62 changes: 61 additions & 1 deletion test/hackney_pool_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ hackney_pool_integration_test_() ->
{"checkin resets owner to pool", fun test_checkin_resets_owner/0},
{"prewarm creates connections", fun test_prewarm/0},
{"queue timeout", {timeout, 120, fun test_queue_timeout/0}},
{"checkout timeout", {timeout, 120, fun test_checkout_timeout/0}}
{"checkout timeout", {timeout, 120, fun test_checkout_timeout/0}},
{"server close detected when idle (issue #544)", {timeout, 30, fun test_server_close_detected/0}}
]}.

setup_unit() ->
Expand Down Expand Up @@ -350,3 +351,62 @@ test_checkout_timeout() ->
?assertEqual(Error, checkout_timeout)
end,
hackney_pool:stop_pool(pool_test_timeout).

%% Test for issue #544: Server closes idle connection, pool should detect it
%% This tests that when a server closes a connection while it's idle in the pool,
%% the connection process receives tcp_closed and terminates, removing itself from pool.
test_server_close_detected() ->
%% Start a simple TCP server that accepts a connection, responds, then closes after delay
Self = self(),
{ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}, {reuseaddr, true}]),
{ok, ServerPort} = inet:port(ListenSock),

%% Server process
ServerPid = spawn_link(fun() ->
{ok, ClientSock} = gen_tcp:accept(ListenSock, 5000),
Self ! {server, accepted},
%% Wait for client to signal ready
receive ready -> ok end,
%% Close from server side - this triggers tcp_closed on client
gen_tcp:close(ClientSock),
Self ! {server, closed},
gen_tcp:close(ListenSock)
end),

%% Create a pool and checkout a connection to our test server
ok = hackney_pool:start_pool(test_pool_server_close, [{pool_size, 5}]),
Opts = [{pool, test_pool_server_close}],

%% Connect to the test server
{ok, PoolInfo, ConnPid} = hackney_pool:checkout("127.0.0.1", ServerPort, hackney_tcp, Opts),
?assert(is_process_alive(ConnPid)),

%% Wait for server to accept
receive {server, accepted} -> ok after 5000 -> error(timeout_accept) end,

%% Check the connection back into the pool
ok = hackney_pool:checkin(PoolInfo, ConnPid),
timer:sleep(50),

%% Verify connection is in the pool
Stats1 = hackney_pool:get_stats(test_pool_server_close),
FreeCount1 = proplists:get_value(free_count, Stats1),
?assertEqual(1, FreeCount1),
?assert(is_process_alive(ConnPid)),

%% Tell server to close the connection
ServerPid ! ready,
receive {server, closed} -> ok after 5000 -> error(timeout_close) end,

%% Give time for tcp_closed to be delivered and processed
timer:sleep(150),

%% Connection process should have terminated (received tcp_closed, transitioned to closed)
?assertNot(is_process_alive(ConnPid)),

%% Connection should have been removed from pool
Stats2 = hackney_pool:get_stats(test_pool_server_close),
FreeCount2 = proplists:get_value(free_count, Stats2),
?assertEqual(0, FreeCount2),

ok = hackney_pool:stop_pool(test_pool_server_close).
Loading