-
Notifications
You must be signed in to change notification settings - Fork 169
Fix broker disconnect to better handle SIGPIPE #548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -631,6 +631,9 @@ static int BrokerPosix_Accept(void* ctx, BROKER_SOCKET_T listen_sock, | |
| BROKER_SOCKET_T* client_sock) | ||
| { | ||
| BROKER_SOCKET_T fd; | ||
| #ifdef SO_NOSIGPIPE | ||
| int on = 1; | ||
| #endif | ||
| (void)ctx; | ||
|
|
||
| fd = accept(listen_sock, NULL, NULL); | ||
|
|
@@ -644,6 +647,14 @@ static int BrokerPosix_Accept(void* ctx, BROKER_SOCKET_T listen_sock, | |
| close(fd); | ||
| return MQTT_CODE_ERROR_SYSTEM; | ||
| } | ||
| #ifdef SO_NOSIGPIPE | ||
| /* macOS / BSDs: suppress SIGPIPE on writes to a peer-closed socket. | ||
| * Without this (and without MSG_NOSIGNAL in send()), a client that | ||
| * publishes QoS>0 and immediately closes its socket would cause the | ||
| * broker's PUBACK/PUBREC write to deliver SIGPIPE, terminating the | ||
| * broker. Linux uses MSG_NOSIGNAL in BrokerPosix_Write instead. */ | ||
| (void)setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)); | ||
| #endif | ||
| *client_sock = fd; | ||
| return MQTT_CODE_SUCCESS; | ||
| } | ||
|
|
@@ -714,7 +725,16 @@ static int BrokerPosix_Write(void* ctx, BROKER_SOCKET_T sock, | |
| return MQTT_CODE_ERROR_NETWORK; | ||
| } | ||
|
|
||
| /* MSG_NOSIGNAL (Linux/BSDs that define it) prevents SIGPIPE delivery when | ||
| * the peer has already closed the connection - the syscall just returns | ||
| * EPIPE and we treat it as a normal network error. Platforms without | ||
| * MSG_NOSIGNAL (e.g. macOS) rely on the SO_NOSIGPIPE socket option set | ||
| * in BrokerPosix_Accept. */ | ||
| #ifdef MSG_NOSIGNAL | ||
| rc = (int)send(sock, buf, (size_t)buf_len, MSG_NOSIGNAL); | ||
| #else | ||
| rc = (int)send(sock, buf, (size_t)buf_len, 0); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 [Medium] Fallback write path still allows SIGPIPE · Logic errors
Fix: Add a fallback SIGPIPE suppression path or fail initialization when no per-send or per-socket suppression is available. |
||
| #endif | ||
| if (rc <= 0) { | ||
| if (rc < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) { | ||
| return MQTT_CODE_CONTINUE; | ||
|
|
@@ -4906,6 +4926,13 @@ int wolfmqtt_broker(int argc, char** argv) | |
| g_broker_shutdown = 0; | ||
| signal(SIGINT, broker_signal_handler); | ||
| signal(SIGTERM, broker_signal_handler); | ||
| /* Belt-and-suspenders for the SIGPIPE-on-peer-close path. The socket | ||
| * layer already uses MSG_NOSIGNAL / SO_NOSIGPIPE per platform, but | ||
| * ignore SIGPIPE process-wide too so any reused or custom net callback | ||
| * cannot kill the broker on a write to a closed peer. */ | ||
| #ifdef SIGPIPE | ||
| signal(SIGPIPE, SIG_IGN); | ||
| #endif | ||
|
|
||
| rc = MqttBroker_Start(&broker); | ||
| if (rc == MQTT_CODE_SUCCESS) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1102,6 +1102,281 @@ TEST(qos2_pubrel_unknown_id_still_pubcomps) | |
| MqttBroker_Free(&broker); | ||
| } | ||
|
|
||
| /* Regression for the SIGPIPE-on-PUBREC-write bug. When a publisher sent a | ||
| * QoS 2 PUBLISH and immediately closed its socket, the broker's subsequent | ||
| * write of PUBREC into the peer-closed socket would deliver SIGPIPE and | ||
| * terminate the broker process (default SIGPIPE disposition is SIGTERM). | ||
| * The fix uses MSG_NOSIGNAL (Linux/BSDs) and SO_NOSIGPIPE (macOS) on the | ||
| * broker's POSIX socket layer, plus an explicit signal(SIGPIPE, SIG_IGN) | ||
| * in the standalone broker main as belt-and-suspenders. | ||
| * | ||
| * The mock-net used by these unit tests never generates SIGPIPE - this | ||
| * test pins the protocol-level state-machine path (orphaned subscriber + | ||
| * publisher publishes QoS 2 + immediate DISCONNECT) so a future regression | ||
| * in the QoS 2 dispatch is caught alongside the wire-level SIGPIPE fix | ||
| * verified end-to-end with the paho reproducer the reporter provided. */ | ||
| TEST(qos2_publish_with_offline_durable_subscriber) | ||
| { | ||
| MqttBroker broker; | ||
| MqttBrokerNet net; | ||
| int i; | ||
| int pub_connacks; | ||
| int pub_pubrecs; | ||
|
|
||
| /* Subscriber CONNECT: ClientId "S", clean_session=0 (flags=0x00). */ | ||
| static const byte connect_sub[] = { | ||
| 0x10, 0x0D, | ||
| 0x00, 0x04, 'M', 'Q', 'T', 'T', | ||
| 0x04, 0x00, 0x00, 0x3C, | ||
| 0x00, 0x01, 'S' | ||
| }; | ||
| /* SUBSCRIBE packet_id=1, filter "x", QoS 2. */ | ||
| static const byte subscribe_x[] = { | ||
| 0x82, 0x06, | ||
| 0x00, 0x01, | ||
| 0x00, 0x01, 'x', | ||
| 0x02 | ||
| }; | ||
| /* Clean DISCONNECT. */ | ||
| static const byte disconnect[] = { 0xE0, 0x00 }; | ||
| /* Publisher CONNECT: ClientId "P", clean_session=0. */ | ||
| static const byte connect_pub[] = { | ||
| 0x10, 0x0D, | ||
| 0x00, 0x04, 'M', 'Q', 'T', 'T', | ||
| 0x04, 0x00, 0x00, 0x3C, | ||
| 0x00, 0x01, 'P' | ||
| }; | ||
| /* PUBLISH QoS 2, packet_id=7, topic "x", payload "p". | ||
| * remain = 2+1+2+1 = 6 */ | ||
| static const byte publish[] = { | ||
| 0x34, 0x06, | ||
| 0x00, 0x01, 'x', | ||
| 0x00, 0x07, | ||
| 'p' | ||
| }; | ||
|
|
||
| install_mock_net(&net); | ||
| XMEMSET(&broker, 0, sizeof(broker)); | ||
| ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net)); | ||
| ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker)); | ||
|
|
||
| /* Phase 1: subscriber connects, subscribes, then disconnects cleanly. | ||
| * After this the broker should have an orphaned sub with client=NULL | ||
| * and client_id="S" still in broker->subs. */ | ||
| reset_mock_clients(2); | ||
| mock_client_input_append(0, connect_sub, sizeof(connect_sub)); | ||
| mock_client_input_append(0, subscribe_x, sizeof(subscribe_x)); | ||
| mock_client_input_append(0, disconnect, sizeof(disconnect)); | ||
| for (i = 0; i < 16; i++) { | ||
| MqttBroker_Step(&broker); | ||
| if (g_clients[0].closed) { | ||
| break; | ||
| } | ||
| } | ||
| ASSERT_TRUE(g_clients[0].closed); | ||
|
|
||
| /* Phase 2: publisher connects, publishes QoS 2, and disconnects - all | ||
| * bytes appended in one shot so the broker may read PUBLISH + DISCONNECT | ||
| * back-to-back with no Step() between, exactly as the paho client does | ||
| * without a sleep before disconnect(). */ | ||
| mock_client_input_append(1, connect_pub, sizeof(connect_pub)); | ||
| mock_client_input_append(1, publish, sizeof(publish)); | ||
| mock_client_input_append(1, disconnect, sizeof(disconnect)); | ||
| for (i = 0; i < 32; i++) { | ||
| MqttBroker_Step(&broker); | ||
| if (g_clients[1].closed) { | ||
| break; | ||
| } | ||
| } | ||
| ASSERT_TRUE(g_clients[1].closed); | ||
|
|
||
| /* Sanity: the broker did process the PUBLISH (one PUBREC out) before | ||
| * tearing down the publisher. The orphaned sub is offline so no | ||
| * forwarded PUBLISH on g_clients[0]. The strong assertion is that | ||
| * MqttBroker_Stop/Free do not crash or trip ASan below. */ | ||
| pub_connacks = count_packets_of_type(g_clients[1].out_buf, | ||
| g_clients[1].out_len, MQTT_PACKET_TYPE_CONNECT_ACK); | ||
| pub_pubrecs = count_packets_of_type(g_clients[1].out_buf, | ||
| g_clients[1].out_len, MQTT_PACKET_TYPE_PUBLISH_REC); | ||
| ASSERT_EQ(1, pub_connacks); | ||
| ASSERT_EQ(1, pub_pubrecs); | ||
|
|
||
| MqttBroker_Stop(&broker); | ||
| MqttBroker_Free(&broker); | ||
| } | ||
|
|
||
| /* Same crash scenario as above but the publisher's TCP goes away abruptly | ||
| * (read returns network error) instead of sending a clean DISCONNECT. | ||
| * Exercises the BrokerClient_AbnormalClose branch. */ | ||
| TEST(qos2_publish_then_abrupt_close_offline_subscriber) | ||
| { | ||
| MqttBroker broker; | ||
| MqttBrokerNet net; | ||
| int i; | ||
| int pub_pubrecs; | ||
| MockClient* mc; | ||
|
|
||
| static const byte connect_sub[] = { | ||
| 0x10, 0x0D, | ||
| 0x00, 0x04, 'M', 'Q', 'T', 'T', | ||
| 0x04, 0x00, 0x00, 0x3C, | ||
| 0x00, 0x01, 'S' | ||
| }; | ||
| static const byte subscribe_x[] = { | ||
| 0x82, 0x06, 0x00, 0x01, 0x00, 0x01, 'x', 0x02 | ||
| }; | ||
| static const byte disconnect[] = { 0xE0, 0x00 }; | ||
| static const byte connect_pub[] = { | ||
| 0x10, 0x0D, | ||
| 0x00, 0x04, 'M', 'Q', 'T', 'T', | ||
| 0x04, 0x00, 0x00, 0x3C, | ||
| 0x00, 0x01, 'P' | ||
| }; | ||
| static const byte publish[] = { | ||
| 0x34, 0x06, 0x00, 0x01, 'x', 0x00, 0x07, 'p' | ||
| }; | ||
|
|
||
| install_mock_net(&net); | ||
| XMEMSET(&broker, 0, sizeof(broker)); | ||
| ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net)); | ||
| ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker)); | ||
|
|
||
| reset_mock_clients(2); | ||
| mock_client_input_append(0, connect_sub, sizeof(connect_sub)); | ||
| mock_client_input_append(0, subscribe_x, sizeof(subscribe_x)); | ||
| mock_client_input_append(0, disconnect, sizeof(disconnect)); | ||
| for (i = 0; i < 16; i++) { | ||
| MqttBroker_Step(&broker); | ||
| if (g_clients[0].closed) break; | ||
| } | ||
| ASSERT_TRUE(g_clients[0].closed); | ||
|
|
||
| /* Publisher: CONNECT + PUBLISH, then no more bytes available. Drain the | ||
| * input by setting in_pos = in_len after the PUBLISH; subsequent reads | ||
| * return TIMEOUT, which the broker's keepalive check would eventually | ||
| * trip - but more importantly, marking the mock as `closed` mid-stream | ||
| * causes the read to return TIMEOUT and then the next call to fail. | ||
| * Instead we simulate an abrupt FIN by appending zero further bytes and | ||
| * letting the in_buf drain, then forcing the mock to report closed so | ||
| * the next mock_read returns immediately. */ | ||
| mock_client_input_append(1, connect_pub, sizeof(connect_pub)); | ||
| mock_client_input_append(1, publish, sizeof(publish)); | ||
| /* Drive enough Steps to process CONNECT and PUBLISH. */ | ||
| for (i = 0; i < 8; i++) { | ||
| MqttBroker_Step(&broker); | ||
| } | ||
| /* Now simulate the peer hanging up: mark the mock as closed so any | ||
| * further mock_read returns TIMEOUT immediately. The broker's keepalive | ||
| * with our mocked WOLFMQTT_BROKER_GET_TIME_S()==0 won't fire, so the | ||
| * test relies on the publisher having no more outgoing handshake bytes | ||
| * after PUBREC. The strong assertion is that ASan finds no UAF when we | ||
| * tear down the broker while the publisher is still in qos2_pending. */ | ||
| mc = &g_clients[1]; | ||
| mc->closed = 1; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔵 [Low] Abrupt-close test never exercises abnormal close · Weak or missing assertions
Fix: Make the mock return |
||
| for (i = 0; i < 4; i++) { | ||
| MqttBroker_Step(&broker); | ||
| } | ||
|
|
||
| pub_pubrecs = count_packets_of_type(g_clients[1].out_buf, | ||
| g_clients[1].out_len, MQTT_PACKET_TYPE_PUBLISH_REC); | ||
| ASSERT_EQ(1, pub_pubrecs); | ||
|
|
||
| /* The broker did not detect the close (no read failure), so the | ||
| * publisher's BrokerClient is still in the list with packet_id=7 in | ||
| * qos2_pending. MqttBroker_Free must walk that list and free the QoS 2 | ||
| * state without crashing. */ | ||
| MqttBroker_Stop(&broker); | ||
| MqttBroker_Free(&broker); | ||
| } | ||
|
|
||
| #ifdef WOLFMQTT_V5 | ||
| /* v5 variant: same orphan-then-publish-then-disconnect pattern but the | ||
| * PUBLISH carries a property block. Tests the suspected use-after-free in | ||
| * the fan-out path where pub.props is shared across subscribers and then | ||
| * freed at the end of BrokerHandle_Publish. With only an orphaned sub | ||
| * matching, the fan-out body does not execute, but the broker's MqttClient | ||
| * still has to decode and free the props. */ | ||
| TEST(qos2_publish_v5_props_with_offline_durable_subscriber) | ||
| { | ||
| MqttBroker broker; | ||
| MqttBrokerNet net; | ||
| int i; | ||
| int pub_pubrecs; | ||
|
|
||
| /* v5 CONNECT subscriber, ClientId "S", clean_session=0, props_len=0. | ||
| * remain = MQTT_hdr(6)+level(1)+flags(1)+keepalive(2)+props_len(1)+ | ||
| * clientid_hdr(2)+clientid(1) = 14 */ | ||
| static const byte connect_sub_v5[] = { | ||
| 0x10, 0x0E, | ||
| 0x00, 0x04, 'M', 'Q', 'T', 'T', | ||
| 0x05, 0x00, 0x00, 0x3C, | ||
| 0x00, | ||
| 0x00, 0x01, 'S' | ||
| }; | ||
| /* v5 SUBSCRIBE packet_id=1, props_len=0, filter "x", options=QoS 2. */ | ||
| static const byte subscribe_v5[] = { | ||
| 0x82, 0x07, | ||
| 0x00, 0x01, | ||
| 0x00, | ||
| 0x00, 0x01, 'x', | ||
| 0x02 | ||
| }; | ||
| /* v5 DISCONNECT (0xE0 0x00 with no reason / props - allowed). */ | ||
| static const byte disconnect_v5[] = { 0xE0, 0x00 }; | ||
| /* v5 CONNECT publisher, ClientId "P". remain = 14 (same as subscriber). */ | ||
| static const byte connect_pub_v5[] = { | ||
| 0x10, 0x0E, | ||
| 0x00, 0x04, 'M', 'Q', 'T', 'T', | ||
| 0x05, 0x00, 0x00, 0x3C, | ||
| 0x00, | ||
| 0x00, 0x01, 'P' | ||
| }; | ||
| /* v5 PUBLISH QoS 2 packet_id=7, topic "x", one Message Expiry Interval | ||
| * property (id=0x02, 4-byte int=60), payload "p". | ||
| * remain = topic_hdr(2)+topic(1)+packet_id(2)+prop_len_vbi(1)+prop(5)+ | ||
| * payload(1) = 12 */ | ||
| static const byte publish_v5[] = { | ||
| 0x34, 0x0C, | ||
| 0x00, 0x01, 'x', | ||
| 0x00, 0x07, | ||
| 0x05, /* prop block length = 5 */ | ||
| 0x02, 0x00, 0x00, 0x00, 0x3C, /* Message Expiry Interval = 60 */ | ||
| 'p' | ||
| }; | ||
|
|
||
| install_mock_net(&net); | ||
| XMEMSET(&broker, 0, sizeof(broker)); | ||
| ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net)); | ||
| ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker)); | ||
|
|
||
| reset_mock_clients(2); | ||
| mock_client_input_append(0, connect_sub_v5, sizeof(connect_sub_v5)); | ||
| mock_client_input_append(0, subscribe_v5, sizeof(subscribe_v5)); | ||
| mock_client_input_append(0, disconnect_v5, sizeof(disconnect_v5)); | ||
| for (i = 0; i < 16; i++) { | ||
| MqttBroker_Step(&broker); | ||
| if (g_clients[0].closed) break; | ||
| } | ||
| ASSERT_TRUE(g_clients[0].closed); | ||
|
|
||
| mock_client_input_append(1, connect_pub_v5, sizeof(connect_pub_v5)); | ||
| mock_client_input_append(1, publish_v5, sizeof(publish_v5)); | ||
| mock_client_input_append(1, disconnect_v5, sizeof(disconnect_v5)); | ||
| for (i = 0; i < 32; i++) { | ||
| MqttBroker_Step(&broker); | ||
| if (g_clients[1].closed) break; | ||
| } | ||
| ASSERT_TRUE(g_clients[1].closed); | ||
|
|
||
| pub_pubrecs = count_packets_of_type(g_clients[1].out_buf, | ||
| g_clients[1].out_len, MQTT_PACKET_TYPE_PUBLISH_REC); | ||
| ASSERT_EQ(1, pub_pubrecs); | ||
|
|
||
| MqttBroker_Stop(&broker); | ||
| MqttBroker_Free(&broker); | ||
| } | ||
| #endif /* WOLFMQTT_V5 */ | ||
|
|
||
| /* MQTT 3.1.1 section 3.12 / v5 section 3.12: PINGREQ has no variable header and no | ||
| * payload, so Remaining Length MUST be 0. Broker dispatch must reject a | ||
| * malformed PINGREQ with an abnormal close instead of emitting a | ||
|
|
@@ -2156,6 +2431,11 @@ int main(int argc, char** argv) | |
| RUN_TEST(qos2_inbound_cap_reached_disconnects); | ||
| RUN_TEST(qos2_state_freed_on_client_disconnect); | ||
| RUN_TEST(qos2_pubrel_unknown_id_still_pubcomps); | ||
| RUN_TEST(qos2_publish_with_offline_durable_subscriber); | ||
| RUN_TEST(qos2_publish_then_abrupt_close_offline_subscriber); | ||
| #ifdef WOLFMQTT_V5 | ||
| RUN_TEST(qos2_publish_v5_props_with_offline_durable_subscriber); | ||
| #endif | ||
| RUN_TEST(pingreq_valid_emits_pingresp); | ||
| RUN_TEST(pingreq_nonzero_remain_len_closes_no_pingresp); | ||
| #ifndef WOLFMQTT_V5 | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔵 [Low] SIGPIPE suppression can silently remain disabled · Resource leaks
BrokerPosix_AcceptignoresSO_NOSIGPIPEsetup failure, so accepted sockets can still reach the zero-flagsend()path and let a peer-closed write terminate the broker withSIGPIPE.Fix: Check
setsockoptand reject the accepted socket, or install a backend fallback before returning success.