From cbb9124a30736335fac83c2e2ce68409b00effe3 Mon Sep 17 00:00:00 2001 From: "Teppei.F" <37261985+T3pp31@users.noreply.github.com> Date: Sun, 31 May 2026 20:06:15 +0900 Subject: [PATCH] Fix AsyncSniffer not stopping promptly when timeout is set Add a control ObjectPipe for nonblocking sniffers when a timeout is set, so stop() can wake select() instead of waiting for the full timeout (#4890). Blocking sockets keep the existing control pipe behavior. Regression test uses select_objects() so ObjectPipe wake works on Windows too. Fixes #4890 AI-Assisted: yes [Cursor] Co-authored-by: Cursor --- scapy/sendrecv.py | 7 +++--- test/answering_machines.uts | 46 +++++++++++++++++++------------------ test/regression.uts | 33 ++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 25 deletions(-) diff --git a/scapy/sendrecv.py b/scapy/sendrecv.py index 2ab413eb2c7..1c9f92a2375 100644 --- a/scapy/sendrecv.py +++ b/scapy/sendrecv.py @@ -1296,8 +1296,10 @@ def _run(self, "will be the one of the first socket") close_pipe = None # type: Optional[ObjectPipe[None]] - if not nonblocking_socket: - # select is blocking: Add special control socket + if not nonblocking_socket or timeout is not None: + # Blocking select needs a control socket to wake up select(). + # Nonblocking sockets with a timeout also need it so that stop() + # does not wait for the remaining timeout (#4890). from scapy.automaton import ObjectPipe close_pipe = ObjectPipe[None]("control_socket") sniff_sockets[close_pipe] = "control_socket" # type: ignore @@ -1309,7 +1311,6 @@ def stop_cb(): self.continue_sniff = False self.stop_cb = stop_cb else: - # select is non blocking def stop_cb(): # type: () -> None self.continue_sniff = False diff --git a/test/answering_machines.uts b/test/answering_machines.uts index 7aa36eee261..7f1d7cd0a7f 100644 --- a/test/answering_machines.uts +++ b/test/answering_machines.uts @@ -9,20 +9,21 @@ = Generic answering machine mocker from unittest import mock -@mock.patch("scapy.ansmachine.sniff") -def test_am(cls_name, packet_query, check_reply, mock_sniff, **kargs): - packet_query = packet_query.__class__(bytes(packet_query)) - def sniff(*args,**kargs): - kargs["prn"](packet_query) - mock_sniff.side_effect = sniff - am = cls_name(**kargs) - called = [False] - def _sndrpl(x): - called[0] = True - check_reply(x.__class__(bytes(x))) - am.send_reply = _sndrpl - am() - assert called[0], "Filter never passed for AnsweringMachine !" + +def test_am(cls_name, packet_query, check_reply, **kargs): + with mock.patch("scapy.ansmachine.sniff") as mock_sniff: + packet_query = packet_query.__class__(bytes(packet_query)) + def sniff(*args,**kargs): + kargs["prn"](packet_query) + mock_sniff.side_effect = sniff + am = cls_name(**kargs) + called = [False] + def _sndrpl(x): + called[0] = True + check_reply(x.__class__(bytes(x))) + am.send_reply = _sndrpl + am() + assert called[0], "Filter never passed for AnsweringMachine !" = BOOT_am @@ -248,14 +249,15 @@ a.print_reply(req, res) = WiFi_am from unittest import mock -@mock.patch("scapy.layers.dot11.sniff") -def test_WiFi_am(packet_query, check_reply, mock_sniff, **kargs): - def sniff(*args,**kargs): - kargs["prn"](packet_query) - mock_sniff.side_effect = sniff - am = WiFi_am(**kargs) - am.send_reply = check_reply - am() + +def test_WiFi_am(packet_query, check_reply, **kargs): + with mock.patch("scapy.layers.dot11.sniff") as mock_sniff: + def sniff(*args,**kargs): + kargs["prn"](packet_query) + mock_sniff.side_effect = sniff + am = WiFi_am(**kargs) + am.send_reply = check_reply + am() def check_WiFi_am_reply(packet): assert isinstance(packet, list) and len(packet) == 2 diff --git a/test/regression.uts b/test/regression.uts index 8f0592b486d..fa717cfb9b2 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -1779,6 +1779,39 @@ try: except ValueError: assert True += AsyncSniffer early stop with timeout on nonblocking socket (#4890) + +import time +from scapy.automaton import ObjectPipe, select_objects +from scapy.supersocket import SuperSocket + +class NonblockingSelectSocket(SuperSocket): + nonblocking_socket = True + def __init__(self): + self.ins = ObjectPipe(name="dummy_sniff_socket") + self.outs = None + def recv(self, x=65535, **kwargs): + self.ins.recv() + return None + @staticmethod + def select(sockets, remain=None): + return select_objects(sockets, remain) + def close(self): + if self.closed: + return + self.closed = True + self.ins.close() + +sock = NonblockingSelectSocket() +sniffer = AsyncSniffer(opened_socket=sock, timeout=10, count=0) +sniffer.start() +time.sleep(0.2) +sniffer.stop(join=False) +sniffer.thread.join(timeout=1) +assert not sniffer.running +assert not sniffer.thread.is_alive() +sock.close() + = Sending a TCP syn 'forever' at layer 2 and layer 3 ~ netaccess needs_root IP def _test():