From e69864861e3db80c87872625942c2a17f2c4987e Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Sun, 22 Feb 2026 14:22:24 -0500 Subject: [PATCH 1/9] isotp: fix soft socket .select() drops ObjectPipe, causing sr1() to hang in threaded mode The select() method was filtering out ObjectPipe instances (like the sniffer's close_pipe) from its return value. This prevented the sniffer's stop mechanism from working correctly in threaded mode - when sniffer.stop() sent to close_pipe, the select() method would unblock but not return the close_pipe, so the sniffer loop couldn't detect the stop signal and had to rely on continue_sniff timing, causing hangs under load. The fix includes close_pipe (ObjectPipe) instances in the select return value, so the sniffer loop properly detects the stop signal via the 'if s is close_pipe: break' check. Added two new tests: - sr1 timeout with threaded=True (no response scenario) - sr1 timeout with threaded=True and background CAN traffic The new "ISOTPSoftSocket select returns control ObjectPipe" test directly verifies that ISOTPSoftSocket.select() passes through ready ObjectPipe instances (e.g. the sniffer's close_pipe). This test deterministically FAILS without the fix and PASSES with it. The integration tests (sr1 timeout with threaded=True) are kept for end-to-end coverage but the race window is too narrow on Linux with TestSocket to reliably trigger the bug. Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Ben Gardiner --- scapy/contrib/isotp/isotp_soft_socket.py | 12 ++- test/contrib/isotp_soft_socket.uts | 96 ++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 4 deletions(-) diff --git a/scapy/contrib/isotp/isotp_soft_socket.py b/scapy/contrib/isotp/isotp_soft_socket.py index 4182d445336..6ccb73665b8 100644 --- a/scapy/contrib/isotp/isotp_soft_socket.py +++ b/scapy/contrib/isotp/isotp_soft_socket.py @@ -202,8 +202,8 @@ def recv(self, x=0xffff, **kwargs): return msg @staticmethod - def select(sockets, remain=None): - # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] + def select(sockets, remain=None): # type: ignore[override] + # type: (List[Union[SuperSocket, ObjectPipe[Any]]], Optional[float]) -> List[Union[SuperSocket, ObjectPipe[Any]]] # noqa: E501 """This function is called during sendrecv() routine to wait for sockets to be ready to receive """ @@ -214,8 +214,12 @@ def select(sockets, remain=None): ready_pipes = select_objects(obj_pipes, remain) - return [x for x in sockets if isinstance(x, ISOTPSoftSocket) and - not x.closed and x.impl.rx_queue in ready_pipes] + result: List[Union[SuperSocket, ObjectPipe[Any]]] = [ + x for x in sockets if isinstance(x, ISOTPSoftSocket) and + not x.closed and x.impl.rx_queue in ready_pipes] + result += [x for x in sockets if isinstance(x, ObjectPipe) and + x in ready_pipes] + return result class TimeoutScheduler: diff --git a/test/contrib/isotp_soft_socket.uts b/test/contrib/isotp_soft_socket.uts index 2e7bdfaeccf..0f0a3992fda 100644 --- a/test/contrib/isotp_soft_socket.uts +++ b/test/contrib/isotp_soft_socket.uts @@ -954,6 +954,102 @@ with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as s assert rx2 is None += ISOTPSoftSocket select returns control ObjectPipe + +from scapy.automaton import ObjectPipe as _ObjectPipe + +close_pipe = _ObjectPipe("control_socket") +close_pipe.send(None) + +with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x123, 0x321) as sock: + result = ISOTPSoftSocket.select([sock, close_pipe], remain=0) + +assert close_pipe in result + +close_pipe.close() + += ISOTPSoftSocket select returns control ObjectPipe alongside ready rx_queue + +from scapy.automaton import ObjectPipe as _ObjectPipe + +close_pipe = _ObjectPipe("control_socket") +close_pipe.send(None) + +with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x641, 0x241) as sock: + sock.impl.rx_queue.send((b'\x62\xF1\x90\x41\x42\x43', 0.0)) + result = ISOTPSoftSocket.select([sock, close_pipe], remain=0) + +assert close_pipe in result +assert sock in result + +close_pipe.close() + += ISOTPSoftSocket sr1 SF request with MF response threaded + +from threading import Thread + +request = ISOTP(b'\x22\xF1\x90') +response_data = b'\x62\xF1\x90' + b'\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4A\x4B\x4C\x4D\x4E\x4F\x50' +response_msg = ISOTP(response_data) + +with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x641, 0x241) as sock_tx, \ + TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x241, 0x641) as sock_rx: + isocan_rx.pair(isocan_tx) + def responder(): + sniffed = sock_rx.sniff(count=1, timeout=5) + if sniffed: + sock_rx.send(response_msg) + resp_thread = Thread(target=responder, daemon=True) + resp_thread.start() + time.sleep(0.1) + rx = sock_tx.sr1(request, timeout=5, verbose=False, threaded=True) + resp_thread.join(timeout=3) + +assert rx is not None +assert rx.data == response_data + += ISOTPSoftSocket sr1 timeout with threaded=True + +from threading import Thread, Event +msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') + +with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \ + TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx: + isocan_rx.pair(isocan_tx) + start = time.time() + rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True) + elapsed = time.time() - start + +assert rx2 is None +assert elapsed < 5 + += ISOTPSoftSocket sr1 timeout with threaded=True and background traffic + +from threading import Thread, Event +msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') + +with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \ + TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx: + isocan_rx.pair(isocan_tx) + stop_traffic = Event() + def bg_traffic(): + while not stop_traffic.is_set(): + try: + isocan_rx.send(CAN(identifier=0x456, data=dhex("01 02 03"))) + except Exception: + break + time.sleep(0.01) + traffic_thread = Thread(target=bg_traffic, daemon=True) + traffic_thread.start() + start = time.time() + rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True) + elapsed = time.time() - start + stop_traffic.set() + traffic_thread.join(timeout=2) + +assert rx2 is None +assert elapsed < 5 + = ISOTPSoftSocket sniff msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') From cc5b3062406927f766ec2aa7d1df93a61feba172 Mon Sep 17 00:00:00 2001 From: Ben Gardiner Date: Fri, 27 Feb 2026 10:39:49 -0500 Subject: [PATCH 2/9] isotp: fix potential cause of intermittent test failures where soft socket is garbage collected --- scapy/contrib/isotp/isotp_soft_socket.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scapy/contrib/isotp/isotp_soft_socket.py b/scapy/contrib/isotp/isotp_soft_socket.py index 6ccb73665b8..5cfb1689308 100644 --- a/scapy/contrib/isotp/isotp_soft_socket.py +++ b/scapy/contrib/isotp/isotp_soft_socket.py @@ -166,7 +166,8 @@ def __init__(self, def close(self): # type: () -> None if not self.closed: - self.impl.close() + if hasattr(self, "impl"): + self.impl.close() self.closed = True def failure_analysis(self): From e5ace92033b4863b90876c33e75f247470c27056 Mon Sep 17 00:00:00 2001 From: Ben Gardiner Date: Fri, 27 Feb 2026 13:35:34 -0500 Subject: [PATCH 3/9] isotpsoft, test: try hard to cleanup background threads in tests --- test/contrib/isotp_soft_socket.uts | 224 ++++++++++++++++++++++++++++- 1 file changed, 222 insertions(+), 2 deletions(-) diff --git a/test/contrib/isotp_soft_socket.uts b/test/contrib/isotp_soft_socket.uts index 0f0a3992fda..be4a20cad24 100644 --- a/test/contrib/isotp_soft_socket.uts +++ b/test/contrib/isotp_soft_socket.uts @@ -1003,7 +1003,14 @@ with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x641, 0x241) as s resp_thread.start() time.sleep(0.1) rx = sock_tx.sr1(request, timeout=5, verbose=False, threaded=True) - resp_thread.join(timeout=3) + resp_thread.join(timeout=5) + assert not resp_thread.is_alive(), "resp_thread still alive" + # Stop TimeoutScheduler while sockets are still open to avoid + # callbacks crashing on closed sockets and writing to stderr. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) assert rx is not None assert rx.data == response_data @@ -1019,6 +1026,11 @@ with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as s start = time.time() rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True) elapsed = time.time() - start + # Stop TimeoutScheduler while sockets are still open. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) assert rx2 is None assert elapsed < 5 @@ -1045,7 +1057,13 @@ with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as s rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True) elapsed = time.time() - start stop_traffic.set() - traffic_thread.join(timeout=2) + traffic_thread.join(timeout=5) + assert not traffic_thread.is_alive(), "traffic_thread still alive" + # Stop TimeoutScheduler while sockets are still open. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) assert rx2 is None assert elapsed < 5 @@ -1357,12 +1375,214 @@ with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241 res.show() assert res.data == dhex("01 02 03 04 05 06 07 08 09") + ++ MF response via sr1() cartesian product tests +# Background traffic from pcap: 3 periodic IDs (0x062, 0x024, 0x039) every +# 10ms, plus a burst of 9 additional IDs every 100ms. +# ECU response latency: ~0.6ms after SF request (from pcap frame 385). +# CF timing after FC: CF1 +8ms, CF2 +10ms, CF3 +10ms (from pcap). +# Expected ISOTP data: "62 00 01 flag{UDS_DATA_READ}" (22 bytes). +# +# Cartesian product dimensions: +# threaded: {False, True} - sr1() threading mode +# can_filters: {[0x7eb], None} - per-socket filtering vs. no filtering +# adapter: {limited (slcan-like), unlimited (candle-like)} +# +# slcan model parameters (from real hardware testing): +# frame_delay=0.0025: ~2.5ms per serial read at 115200 baud +# serial_timeout=0.1: python-can slcan Serial(timeout=0.1) blocks 100ms +# when serial buffer is empty +# read_time_limit=0.02: SocketMapper.READ_BUS_TIME_LIMIT = 20ms caps +# total read time per mux call +# prefill_frames=200: OS serial buffer backlog from busy CAN bus +# +# All tests use retry=0, timeout=1.0. All should PASS with the fix +# (can_filters stripped from raw Bus, per-socket filtering in mux, +# read_bus time-limited to avoid TimeoutScheduler thread starvation). + += MF response helper setup for cartesian product tests + +from threading import Thread, Event + +def run_mf_response_test(frame_delay, mux_throttle, filters_kwarg, threaded, + prefill_frames=0, serial_timeout=0.0, + read_time_limit=0.0, interface_name="slcan"): + import time as _time + from threading import Thread as _Thread, Event as _Event + from scapy.layers.can import CAN as _CAN + from scapy.contrib.isotp import ISOTP as _ISOTP + from scapy.contrib.isotp.isotp_soft_socket import ISOTPSoftSocket as _ISOTPSoftSocket + from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler as _TimeoutScheduler + from test.testsocket import TestSocket as _TestSocket, SlowTestSocket as _SlowTestSocket + _dhex = bytes.fromhex + response_data = _dhex("620001666c61677b5544535f444154415f524541447d") + bg_periodic = [0x062, 0x024, 0x039] + bg_burst = [0x1d3, 0x024, 0x039, 0x077, 0x098, 0x150, 0x1a7, 0x1b8, 0x1bb] + if frame_delay > 0: + sock_cls = _SlowTestSocket + sock_kwargs = dict(frame_delay=frame_delay, mux_throttle=mux_throttle, + serial_timeout=serial_timeout, + read_time_limit=read_time_limit, + interface_name=interface_name, + **filters_kwarg) + else: + sock_cls = _TestSocket + sock_kwargs = {} + with sock_cls(_CAN, **sock_kwargs) as isocan, \ + _TestSocket(_CAN) as ecu_mon, \ + _ISOTPSoftSocket(isocan, tx_id=0x7e3, rx_id=0x7eb) as sock: + with _TestSocket(_CAN) as stim: + stim.pair(isocan) + isocan.pair(ecu_mon) + # Pre-fill the serial buffer with background frames to + # simulate a real slcan adapter that has been connected to + # a busy CAN bus. On real hardware the OS serial buffer + # accumulates hundreds of frames before the ISOTP exchange. + for _ in range(prefill_frames): + bid = bg_periodic[_ % len(bg_periodic)] + stim.send(_CAN(identifier=bid, data=bytes(8))) + fc_received = _Event() + stop = _Event() + bg_cycle = [0] + def bg_generator(): + while not stop.is_set(): + for bid in bg_periodic: + if stop.is_set(): + return + stim.send(_CAN(identifier=bid, data=bytes(8))) + bg_cycle[0] += 1 + if bg_cycle[0] % 10 == 0: + for bid in bg_burst: + if stop.is_set(): + return + stim.send(_CAN(identifier=bid, data=bytes(8))) + _time.sleep(0.010) + def ecu_simulation(): + _time.sleep(0.05) + stim.send(_CAN(identifier=0x7eb, data=_dhex("1016620001666c61"))) + fc_received.wait(timeout=10.0) + if not fc_received.is_set(): + return + _time.sleep(0.008) + stim.send(_CAN(identifier=0x7eb, data=_dhex("21677b5544535f44"))) + _time.sleep(0.010) + stim.send(_CAN(identifier=0x7eb, data=_dhex("224154415f524541"))) + _time.sleep(0.010) + stim.send(_CAN(identifier=0x7eb, data=_dhex("23447d"))) + def fc_watcher(): + while not stop.is_set(): + if _TestSocket.select([ecu_mon], 0.1): + pkt = ecu_mon.recv() + if pkt is not None and pkt.identifier == 0x7e3 and \ + len(pkt.data) >= 1 and bytes(pkt.data)[0] == 0x30: + fc_received.set() + return + bg_thread = _Thread(target=bg_generator) + ecu_thread = _Thread(target=ecu_simulation) + fc_thread = _Thread(target=fc_watcher) + bg_thread.start() + ecu_thread.start() + fc_thread.start() + result = sock.sr1(_ISOTP(data=_dhex("220001")), + retry=0, timeout=10.0, + threaded=threaded, verbose=0) + stop.set() + fc_received.set() + bg_thread.join(timeout=5) + ecu_thread.join(timeout=5) + fc_thread.join(timeout=5) + assert not bg_thread.is_alive(), "bg_thread still alive" + assert not ecu_thread.is_alive(), "ecu_thread still alive" + assert not fc_thread.is_alive(), "fc_thread still alive" + # Stop TimeoutScheduler while sockets are still open to + # avoid callbacks crashing on closed sockets and writing + # to stderr (causes fatal error on Python 3.13 Windows). + _ts_thread = _TimeoutScheduler._thread + _TimeoutScheduler.clear() + if _ts_thread is not None: + _ts_thread.join(timeout=5) + return result, response_data + += MF response: candle-like unlimited, no can_filters, threaded=False + +result, expected = run_mf_response_test( + frame_delay=0, mux_throttle=0, + filters_kwarg={}, threaded=False, interface_name="candle") +assert result is not None, "MF response not received (candle, no filters, threaded=False)" +assert result.data == expected + += MF response: candle-like unlimited, no can_filters, threaded=True + +result, expected = run_mf_response_test( + frame_delay=0, mux_throttle=0, + filters_kwarg={}, threaded=True, interface_name="candle") +assert result is not None, "MF response not received (candle, no filters, threaded=True)" +assert result.data == expected + += MF response: candle-like unlimited, can_filters=[0x7eb], threaded=False + +result, expected = run_mf_response_test( + frame_delay=0, mux_throttle=0, + filters_kwarg=dict(can_filters=[0x7eb]), threaded=False, + interface_name="candle") +assert result is not None, "MF response not received (candle, can_filters, threaded=False)" +assert result.data == expected + += MF response: candle-like unlimited, can_filters=[0x7eb], threaded=True + +result, expected = run_mf_response_test( + frame_delay=0, mux_throttle=0, + filters_kwarg=dict(can_filters=[0x7eb]), threaded=True, + interface_name="candle") +assert result is not None, "MF response not received (candle, can_filters, threaded=True)" +assert result.data == expected + += MF response: slcan-like limited, no can_filters, threaded=False + +result, expected = run_mf_response_test( + frame_delay=0.0025, mux_throttle=0.001, serial_timeout=0.1, + read_time_limit=0.02, filters_kwarg={}, threaded=False, + prefill_frames=200) +assert result is not None, "MF response not received (slcan, no filters, threaded=False)" +assert result.data == expected + += MF response: slcan-like limited, no can_filters, threaded=True + +result, expected = run_mf_response_test( + frame_delay=0.0025, mux_throttle=0.001, serial_timeout=0.1, + read_time_limit=0.02, filters_kwarg={}, threaded=True, + prefill_frames=200) +assert result is not None, "MF response not received (slcan, no filters, threaded=True)" +assert result.data == expected + += MF response: slcan-like limited, can_filters=[0x7eb], threaded=False + +result, expected = run_mf_response_test( + frame_delay=0.0025, mux_throttle=0.001, serial_timeout=0.1, + read_time_limit=0.02, filters_kwarg=dict(can_filters=[0x7eb]), + threaded=False, prefill_frames=200) +assert result is not None, "MF response not received (slcan, can_filters, threaded=False)" +assert result.data == expected + += MF response: slcan-like limited, can_filters=[0x7eb], threaded=True + +result, expected = run_mf_response_test( + frame_delay=0.0025, mux_throttle=0.001, serial_timeout=0.1, + read_time_limit=0.02, filters_kwarg=dict(can_filters=[0x7eb]), + threaded=True, prefill_frames=200) +assert result is not None, "MF response not received (slcan, can_filters, threaded=True)" +assert result.data == expected + + + Cleanup = Delete testsockets cleanup_testsockets() +_ts = TimeoutScheduler._thread TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) log_runtime.removeHandler(handler) From 599e23f7ebc9537a6ab3ca428702954e8d402dac Mon Sep 17 00:00:00 2001 From: Ben Gardiner Date: Fri, 27 Feb 2026 13:36:21 -0500 Subject: [PATCH 4/9] isotpsoft, test: sr1() soft socket tests incl MF resp, SF req on slow (slcan) interface introduce mulitple tests to confirm that all the combinations of filters, threading, slow/fast interfaces work with the isotpsoft socket in the particularly problematic case of a SF request yielding an MF respoonse. The new tests currently fail for slow (slcan) interfaces --- test/contrib/isotp_soft_socket.uts | 86 ++++++++++++++- test/testsocket.py | 166 +++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+), 1 deletion(-) diff --git a/test/contrib/isotp_soft_socket.uts b/test/contrib/isotp_soft_socket.uts index be4a20cad24..f112563d62a 100644 --- a/test/contrib/isotp_soft_socket.uts +++ b/test/contrib/isotp_soft_socket.uts @@ -10,7 +10,7 @@ from io import BytesIO from scapy.layers.can import * from scapy.contrib.isotp import * from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler -from test.testsocket import TestSocket, cleanup_testsockets +from test.testsocket import TestSocket, SlowTestSocket, cleanup_testsockets with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: exec(f.read()) @@ -1068,6 +1068,90 @@ with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as s assert rx2 is None assert elapsed < 5 += ISOTPSoftSocket sr1 SF request with MF response threaded and background traffic on slow interface + +from threading import Thread, Event + +response_data = b'\x62\xF1\x90' + b'\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4A\x4B\x4C\x4D\x4E\x4F\x50' + +stim = TestSocket(CAN) +isocan = TestSocket(CAN) +stim.pair(isocan) + +bg_frame = CAN(identifier=0x456, data=dhex("01 02 03")) +ff_frame = CAN(identifier=0x241, data=dhex("10 13 62 F1 90 41 42 43")) +cf1_frame = CAN(identifier=0x241, data=dhex("21 44 45 46 47 48 49 4A")) +cf2_frame = CAN(identifier=0x241, data=dhex("22 4B 4C 4D 4E 4F 50 00")) + +bg_count = 2000 # Large number of frames to stress the ISOTPSoftSocket implementation + +for _ in range(100): + _ = stim.send(bg_frame) + +stim.send(ff_frame) + +for _ in range(bg_count): + _ = stim.send(bg_frame) + +stim.send(cf1_frame) +stim.send(cf2_frame) + +with isocan, stim, ISOTPSoftSocket(isocan, 0x641, 0x241) as sock: + pkts = sock.sniff(count=1, timeout=10) + # Stop TimeoutScheduler while sockets are still open. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) + +assert len(pkts) == 1, "MF response not received due to background traffic" +assert pkts[0].data == response_data + += ISOTPSoftSocket MF response with delayed CFs and background traffic + +from threading import Thread, Event + +response_data = b'\x62\xF1\x90' + b'\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4A\x4B\x4C\x4D\x4E\x4F\x50' + +with TestSocket(CAN) as stim, TestSocket(CAN) as isocan, \ + ISOTPSoftSocket(isocan, 0x641, 0x241) as sock: + stim.pair(isocan) + stop_traffic = Event() + def bg_traffic(): + bg_frame = CAN(identifier=0x456, data=dhex("01 02 03")) + while not stop_traffic.is_set(): + try: + stim.send(bg_frame) + except Exception: + break + time.sleep(0.001) + def delayed_response(): + time.sleep(0.05) + sock.impl.rx_tx_poll_rate = 10 + stim.send(CAN(identifier=0x241, data=dhex("10 13 62 F1 90 41 42 43"))) + time.sleep(0.01) + stim.send(CAN(identifier=0x241, data=dhex("21 44 45 46 47 48 49 4A"))) + time.sleep(0.01) + stim.send(CAN(identifier=0x241, data=dhex("22 4B 4C 4D 4E 4F 50 00"))) + traffic_thread = Thread(target=bg_traffic) + traffic_thread.start() + resp_thread = Thread(target=delayed_response) + resp_thread.start() + pkts = sock.sniff(count=1, timeout=5) + stop_traffic.set() + traffic_thread.join(timeout=5) + resp_thread.join(timeout=5) + assert not traffic_thread.is_alive(), "traffic_thread still alive" + assert not resp_thread.is_alive(), "resp_thread still alive" + # Stop TimeoutScheduler while sockets are still open. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) + +assert len(pkts) == 1, "MF response not received with delayed CFs and slow poll rate" +assert pkts[0].data == response_data + = ISOTPSoftSocket sniff msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') diff --git a/test/testsocket.py b/test/testsocket.py index d1a90a4da51..1ecd79f4fec 100644 --- a/test/testsocket.py +++ b/test/testsocket.py @@ -178,6 +178,172 @@ def recv(self, x=MTU, **kwargs): return super(UnstableSocket, self).recv(x, **kwargs) +class SlowTestSocket(TestSocket): + """A TestSocket that simulates the mux/throttle behavior of + PythonCANSocket on a slow serial interface (like slcan). + + Frames sent to this socket go into an intermediate serial buffer. + They only become visible to recv()/select() after mux() moves + them to the rx ObjectPipe. + + Key parameters model the real slcan timing bottleneck: + - frame_delay: per-frame serial read time (~2-3ms on real slcan) + - serial_timeout: blocking wait when serial buffer is empty. + Real python-can slcan uses serial.Serial(timeout=0.1), so + bus.recv(timeout=0) blocks for 100ms when buffer is empty. + - read_time_limit: max time spent reading per mux call, matching + SocketMapper.READ_BUS_TIME_LIMIT in production code. + + can_filters: Optional list of CAN identifiers for per-socket + filtering. When set, mux reads all frames but only delivers + matching ones, like SocketMapper.distribute() + _matches_filters. + """ + + def __init__(self, basecls=None, frame_delay=0.0002, + mux_throttle=0.001, can_filters=None, + serial_timeout=0.0, read_time_limit=0.0, + interface_name="slcan"): + # type: (Optional[Type[Packet]], float, float, Optional[List[int]], float, float, str) -> None # noqa: E501 + """ + :param frame_delay: Simulated per-frame serial read time (seconds). + :param mux_throttle: Minimum time between mux calls (default 1ms). + :param can_filters: Optional list of CAN identifiers for filtering. + :param serial_timeout: Time to block when serial buffer is empty + (models python-can slcan serial.Serial(timeout=0.1)). + Set to 0.1 to reproduce real slcan behavior. + :param read_time_limit: Max time per mux read pass (seconds). + Set to 0.02 to match SocketMapper.READ_BUS_TIME_LIMIT. + When 0 (default), no time limit is applied. + :param interface_name: Simulated interface name (default "slcan"). + Used in test descriptions to identify the adapter type. + """ + super(SlowTestSocket, self).__init__(basecls) + self.interface_name = interface_name + from collections import deque + self._serial_buffer = deque() # type: deque[bytes] + self._serial_lock = Lock() + self._last_mux = 0.0 + self._frame_delay = frame_delay + self._mux_throttle = mux_throttle + self._can_filters = can_filters + self._serial_timeout = serial_timeout + self._read_time_limit = read_time_limit + self._real_ins = self.ins + self.ins = _SlowPipeWrapper(self) # type: ignore[assignment] + + @staticmethod + def _extract_can_id(frame): + # type: (bytes) -> int + """Extract CAN identifier from raw CAN frame bytes.""" + import struct + if len(frame) < 4: + return -1 + return int(struct.unpack('!I', frame[:4])[0] & 0x1FFFFFFF) + + def _mux(self): + # type: () -> None + """Move frames from serial buffer to rx ObjectPipe. + + Models the real PythonCANSocket read path: + 1. read_bus(): loop calling bus.recv(timeout=0) — each call + takes frame_delay when data is available, or serial_timeout + when the buffer is empty (modeling slcan serial timeout). + 2. distribute(): deliver matching frames to the ObjectPipe. + + With read_time_limit > 0, the read loop stops after that many + seconds (matching SocketMapper.READ_BUS_TIME_LIMIT). + """ + now = time.monotonic() + if now - self._last_mux < self._mux_throttle: + return + + # Phase 1: read_bus — read frames from serial buffer + msgs = [] + deadline = time.monotonic() + self._read_time_limit \ + if self._read_time_limit > 0 else None + while True: + if self.closed: + break + with self._serial_lock: + if self._serial_buffer: + frame = self._serial_buffer.popleft() + else: + frame = None + if frame is None: + # Empty buffer: model the serial timeout blocking + if self._serial_timeout > 0: + time.sleep(self._serial_timeout) + break + if self._frame_delay > 0: + time.sleep(self._frame_delay) + msgs.append(frame) + if deadline and time.monotonic() >= deadline: + break + + # Phase 2: distribute — apply per-socket filtering + for frame in msgs: + if self._can_filters is not None: + can_id = self._extract_can_id(frame) + if can_id not in self._can_filters: + continue + self._real_ins.send(frame) + + self._last_mux = time.monotonic() + + def recv_raw(self, x=MTU): + # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] # noqa: E501 + """Read from the rx ObjectPipe (populated by mux via select).""" + return self.basecls, self._real_ins.recv(0), time.time() + + def send(self, x): + # type: (Packet) -> int + if self._frame_delay > 0: + time.sleep(self._frame_delay) + return super(SlowTestSocket, self).send(x) + + @staticmethod + def select(sockets, remain=conf.recv_poll_rate): + # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] + for s in sockets: + if isinstance(s, SlowTestSocket): + s._mux() + return select_objects(sockets, remain) + + def close(self): + # type: () -> None + self.ins = self._real_ins + super(SlowTestSocket, self).close() + + +class _SlowPipeWrapper: + """Wrapper that intercepts send() to route into serial buffer.""" + def __init__(self, owner): + # type: (SlowTestSocket) -> None + self._owner = owner + + def send(self, data): + # type: (bytes) -> None + with self._owner._serial_lock: + self._owner._serial_buffer.append(data) + + def recv(self, timeout=0): + # type: (int) -> Optional[bytes] + return self._owner._real_ins.recv(timeout) + + def fileno(self): + # type: () -> int + return self._owner._real_ins.fileno() + + def close(self): + # type: () -> None + self._owner._real_ins.close() + + @property + def closed(self): + # type: () -> bool + return bool(self._owner._real_ins.closed) # type: ignore[attr-defined] + + def cleanup_testsockets(): # type: () -> None """ From 9af046222c0a6848b7ff10f1286f7ff5f04e4c0d Mon Sep 17 00:00:00 2001 From: Ben Gardiner Date: Fri, 27 Feb 2026 13:52:15 -0500 Subject: [PATCH 5/9] isotpsoft: make TimeoutScheduler._task a daemon thread Make this timeout scheduler a daemon thread. This should fix the python 3.13 tox failures on windows. --- scapy/contrib/isotp/isotp_soft_socket.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scapy/contrib/isotp/isotp_soft_socket.py b/scapy/contrib/isotp/isotp_soft_socket.py index 5cfb1689308..07e45e86b66 100644 --- a/scapy/contrib/isotp/isotp_soft_socket.py +++ b/scapy/contrib/isotp/isotp_soft_socket.py @@ -256,6 +256,7 @@ def schedule(cls, timeout, callback): # Start the scheduling thread if it is not started already if cls._thread is None: t = Thread(target=cls._task, name="TimeoutScheduler._task") + t.daemon = True must_interrupt = False cls._thread = t cls._event.clear() From e04a616d65e6dbb875dad19ecd8d48edddcfe0ef Mon Sep 17 00:00:00 2001 From: Ben Gardiner Date: Fri, 27 Feb 2026 14:05:58 -0500 Subject: [PATCH 6/9] python-can, mux: special case for slcan: drop bus filters --- scapy/contrib/cansocket_python_can.py | 32 +++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/scapy/contrib/cansocket_python_can.py b/scapy/contrib/cansocket_python_can.py index baf4a6cbd74..b4710bdb8a8 100644 --- a/scapy/contrib/cansocket_python_can.py +++ b/scapy/contrib/cansocket_python_can.py @@ -16,6 +16,7 @@ from functools import reduce from operator import add + from collections import deque from scapy.config import conf @@ -161,13 +162,36 @@ def register(self, socket, *args, **kwargs): if k in self.pool: t = self.pool[k] t.sockets.append(socket) - filters = [s.filters for s in t.sockets - if s.filters is not None] - if filters: - t.bus.set_filters(reduce(add, filters)) + # Update bus-level filters to the union of all sockets' + # filters. For non-slcan interfaces (socketcan, kvaser, + # vector), this enables efficient hardware/kernel + # filtering. For slcan, the bus filters were already + # cleared on creation, so this is a no-op (all sockets + # on slcan share the unfiltered bus). + if not k.lower().startswith('slcan'): + filters = [s.filters for s in t.sockets + if s.filters is not None] + if filters: + t.bus.set_filters(reduce(add, filters)) socket.name = k else: bus = can_Bus(*args, **kwargs) + # Serial interfaces like slcan only do software + # filtering inside BusABC.recv(): the recv loop reads + # one frame, finds it doesn't match, and returns + # None -- silently consuming serial bandwidth without + # returning the frame to the mux. This starves the + # mux on busy buses. + # + # For slcan, clear the filters from the bus so that + # bus.recv() returns ALL frames. Per-socket filtering + # in distribute() via _matches_filters() handles + # delivery. Other interfaces (socketcan, kvaser, + # vector, candle) perform efficient hardware/kernel + # filtering and should keep their bus-level filters. + if kwargs.get('can_filters') and \ + k.lower().startswith('slcan'): + bus.set_filters(None) socket.name = k self.pool[k] = SocketMapper(bus, [socket]) From b864565f429df505677c5486435cfe65f9199c16 Mon Sep 17 00:00:00 2001 From: Ben Gardiner Date: Fri, 27 Feb 2026 14:08:40 -0500 Subject: [PATCH 7/9] isotpsoft: schedule timeouts to work with slow (slcan) interfaces, make close and timeouts more robust --- scapy/contrib/isotp/isotp_soft_socket.py | 68 ++++++++++++++++++++---- 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/scapy/contrib/isotp/isotp_soft_socket.py b/scapy/contrib/isotp/isotp_soft_socket.py index 07e45e86b66..f80da8ff154 100644 --- a/scapy/contrib/isotp/isotp_soft_socket.py +++ b/scapy/contrib/isotp/isotp_soft_socket.py @@ -556,6 +556,7 @@ def __init__(self, self.tx_handle = TimeoutScheduler.schedule( self.rx_tx_poll_rate, self._send) self.last_rx_call = 0.0 + self.rx_start_time = 0.0 def failure_analysis(self): # type: () -> None @@ -598,10 +599,17 @@ def _get_padding_size(pl_size): def can_recv(self): # type: () -> None self.last_rx_call = TimeoutScheduler._time() - if self.can_socket.select([self.can_socket], 0): - pkt = self.can_socket.recv() - if pkt: - self.on_can_recv(pkt) + try: + while self.can_socket.select([self.can_socket], 0): + pkt = self.can_socket.recv() + if pkt: + self.on_can_recv(pkt) + else: + break + except Exception: + if not self.closed: + log_isotp.warning("Error in can_recv: %s", + traceback.format_exc()) if not self.closed and not self.can_socket.closed: if self.can_socket.select([self.can_socket], 0): poll_time = 0.0 @@ -649,13 +657,46 @@ def close(self): self.tx_handle.cancel() except Scapy_Exception: pass + if self.rx_timeout_handle is not None: + try: + self.rx_timeout_handle.cancel() + except Scapy_Exception: + pass + if self.tx_timeout_handle is not None: + try: + self.tx_timeout_handle.cancel() + except Scapy_Exception: + pass + try: + self.rx_queue.close() + except (OSError, EOFError): + pass + try: + self.tx_queue.close() + except (OSError, EOFError): + pass def _rx_timer_handler(self): # type: () -> None """Method called every time the rx_timer times out, due to the peer not sending a consecutive frame within the expected time window""" + if self.closed: + return + if self.rx_state == ISOTP_WAIT_DATA: + # On slow serial interfaces (slcan), the mux reads frames + # from an OS serial buffer that may contain hundreds of + # background CAN frames. Consecutive Frames from the ECU + # are queued behind this backlog and can take several + # seconds to reach the ISOTP state machine. Extend the + # timeout up to 10 × cf_timeout to give the mux enough + # time to drain the backlog. + total_wait = TimeoutScheduler._time() - self.rx_start_time + if total_wait < self.cf_timeout * 10: + self.rx_timeout_handle = TimeoutScheduler.schedule( + self.cf_timeout, self._rx_timer_handler) + return # we did not get new data frames in time. # reset rx state self.rx_state = ISOTP_IDLE @@ -668,6 +709,9 @@ def _tx_timer_handler(self): two situations: either a Flow Control frame was not received in time, or the Separation Time Min is expired and a new frame must be sent.""" + if self.closed: + return + if (self.tx_state == ISOTP_WAIT_FC or self.tx_state == ISOTP_WAIT_FIRST_FC): # we did not get any flow control frame in time @@ -872,6 +916,7 @@ def _recv_ff(self, data, ts): # initial setup for this pdu reception self.rx_sn = 1 self.rx_state = ISOTP_WAIT_DATA + self.rx_start_time = TimeoutScheduler._time() # no creation of flow control frames if not self.listen_only: @@ -1000,11 +1045,16 @@ def begin_send(self, x): def _send(self): # type: () -> None - if self.tx_state == ISOTP_IDLE: - if select_objects([self.tx_queue], 0): - pkt = self.tx_queue.recv() - if pkt: - self.begin_send(pkt) + try: + if self.tx_state == ISOTP_IDLE: + if select_objects([self.tx_queue], 0): + pkt = self.tx_queue.recv() + if pkt: + self.begin_send(pkt) + except Exception: + if not self.closed: + log_isotp.warning("Error in _send: %s", + traceback.format_exc()) if not self.closed: self.tx_handle = TimeoutScheduler.schedule( From e6d6495e21d99d81877f9e3c93aa8d3f660f887d Mon Sep 17 00:00:00 2001 From: Ben Gardiner Date: Fri, 27 Feb 2026 14:10:16 -0500 Subject: [PATCH 8/9] python-can, mux: limit time under locks, optimize data receive latency --- scapy/contrib/cansocket_python_can.py | 58 ++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/scapy/contrib/cansocket_python_can.py b/scapy/contrib/cansocket_python_can.py index b4710bdb8a8..340104a4abb 100644 --- a/scapy/contrib/cansocket_python_can.py +++ b/scapy/contrib/cansocket_python_can.py @@ -56,14 +56,33 @@ def __init__(self, bus, sockets): """ self.bus = bus self.sockets = sockets - - def mux(self): - # type: () -> None - """Multiplexer function. Tries to receive from its python-can bus - object. If a message is received, this message gets forwarded to - all receive queues of the SocketWrapper objects. + self.closing = False + + # Maximum time (seconds) to spend reading frames in one read_bus() + # call. On serial interfaces (slcan) the final bus.recv(timeout=0) + # when the buffer is empty blocks for the serial port's read timeout + # (typically 100ms in python-can's slcan driver). During that block + # the TimeoutScheduler thread cannot run any other callbacks. By + # capping total read time, we ensure the scheduler stays responsive + # even on slow serial interfaces with heavy background traffic. + READ_BUS_TIME_LIMIT = 0.020 # 20 ms + + def read_bus(self): + # type: () -> List[can_Message] + """Read available frames from the bus, up to READ_BUS_TIME_LIMIT. + + On slow serial interfaces (slcan), bus.recv(timeout=0) can + block for ~100ms when the serial buffer is empty (python-can's + slcan serial timeout). This method limits total time spent + reading so the TimeoutScheduler thread stays responsive. + + This method intentionally does NOT hold pool_mutex so that + concurrent send() calls are not blocked during the serial I/O. """ + if self.closing: + return [] msgs = [] + deadline = time.monotonic() + self.READ_BUS_TIME_LIMIT while True: try: msg = self.bus.recv(timeout=0) @@ -71,9 +90,17 @@ def mux(self): break else: msgs.append(msg) + if time.monotonic() >= deadline: + break except Exception as e: - warning("[MUX] python-can exception caught: %s" % e) - + if not self.closing: + warning("[MUX] python-can exception caught: %s" % e) + break + return msgs + + def distribute(self, msgs): + # type: (List[can_Message]) -> None + """Distribute received messages to all subscribed sockets.""" for sock in self.sockets: with sock.lock: for msg in msgs: @@ -133,9 +160,17 @@ def multiplex_rx_packets(self): # this object is singleton and all python-CAN sockets are using # the same instance and locking the same locks. return + # Snapshot pool entries under the lock, then read from each bus + # WITHOUT holding pool_mutex. On slow serial interfaces (slcan) + # bus.recv(timeout=0) can take ~2-3ms per frame; holding the + # mutex during those reads would block send() for the entire + # duration. with self.pool_mutex: - for t in self.pool.values(): - t.mux() + mappers = list(self.pool.values()) + for mapper in mappers: + msgs = mapper.read_bus() + if msgs: + mapper.distribute(msgs) self.last_call = time.monotonic() def register(self, socket, *args, **kwargs): @@ -212,6 +247,7 @@ def unregister(self, socket): t = self.pool[socket.name] t.sockets.remove(socket) if not t.sockets: + t.closing = True t.bus.shutdown() del self.pool[socket.name] except KeyError: @@ -346,6 +382,7 @@ def select(sockets, remain=conf.recv_poll_rate): :returns: an array of sockets that were selected and the function to be called next to get the packets (i.g. recv) """ + SocketsPool.multiplex_rx_packets() ready_sockets = \ [s for s in sockets if isinstance(s, PythonCANSocket) and len(s.can_iface.rx_queue)] @@ -357,7 +394,6 @@ def select(sockets, remain=conf.recv_poll_rate): # yield this thread to avoid starvation time.sleep(0) - SocketsPool.multiplex_rx_packets() return cast(List[SuperSocket], ready_sockets) def close(self): From 1cfc0671517918dd2db84169f41bf51b9827c44a Mon Sep 17 00:00:00 2001 From: Ben Gardiner Date: Fri, 27 Feb 2026 14:10:54 -0500 Subject: [PATCH 9/9] isotpsoft: optimize for slow (slcan) interfaces: don't call select when the internal state will do --- scapy/contrib/isotp/isotp_soft_socket.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/scapy/contrib/isotp/isotp_soft_socket.py b/scapy/contrib/isotp/isotp_soft_socket.py index f80da8ff154..e6baa1dbbf7 100644 --- a/scapy/contrib/isotp/isotp_soft_socket.py +++ b/scapy/contrib/isotp/isotp_soft_socket.py @@ -611,7 +611,14 @@ def can_recv(self): log_isotp.warning("Error in can_recv: %s", traceback.format_exc()) if not self.closed and not self.can_socket.closed: - if self.can_socket.select([self.can_socket], 0): + # Determine poll_time from ISOTP state only. + # Avoid calling select() here — on slow serial interfaces + # (slcan), each select() triggers a mux() call that reads + # N frames at ~2.5ms each, wasting time that could be spent + # processing frames already in the rx_queue. + if self.rx_state == ISOTP_WAIT_DATA or \ + self.tx_state == ISOTP_WAIT_FC or \ + self.tx_state == ISOTP_WAIT_FIRST_FC: poll_time = 0.0 else: poll_time = self.rx_tx_poll_rate