diff --git a/scapy/contrib/cansocket_python_can.py b/scapy/contrib/cansocket_python_can.py index baf4a6cbd74..340104a4abb 100644 --- a/scapy/contrib/cansocket_python_can.py +++ b/scapy/contrib/cansocket_python_can.py @@ -16,6 +16,7 @@ from functools import reduce from operator import add + from collections import deque from scapy.config import conf @@ -55,14 +56,33 @@ def __init__(self, bus, sockets): """ self.bus = bus self.sockets = sockets - - def mux(self): - # type: () -> None - """Multiplexer function. Tries to receive from its python-can bus - object. If a message is received, this message gets forwarded to - all receive queues of the SocketWrapper objects. + self.closing = False + + # Maximum time (seconds) to spend reading frames in one read_bus() + # call. On serial interfaces (slcan) the final bus.recv(timeout=0) + # when the buffer is empty blocks for the serial port's read timeout + # (typically 100ms in python-can's slcan driver). During that block + # the TimeoutScheduler thread cannot run any other callbacks. By + # capping total read time, we ensure the scheduler stays responsive + # even on slow serial interfaces with heavy background traffic. + READ_BUS_TIME_LIMIT = 0.020 # 20 ms + + def read_bus(self): + # type: () -> List[can_Message] + """Read available frames from the bus, up to READ_BUS_TIME_LIMIT. + + On slow serial interfaces (slcan), bus.recv(timeout=0) can + block for ~100ms when the serial buffer is empty (python-can's + slcan serial timeout). This method limits total time spent + reading so the TimeoutScheduler thread stays responsive. + + This method intentionally does NOT hold pool_mutex so that + concurrent send() calls are not blocked during the serial I/O. """ + if self.closing: + return [] msgs = [] + deadline = time.monotonic() + self.READ_BUS_TIME_LIMIT while True: try: msg = self.bus.recv(timeout=0) @@ -70,9 +90,17 @@ def mux(self): break else: msgs.append(msg) + if time.monotonic() >= deadline: + break except Exception as e: - warning("[MUX] python-can exception caught: %s" % e) - + if not self.closing: + warning("[MUX] python-can exception caught: %s" % e) + break + return msgs + + def distribute(self, msgs): + # type: (List[can_Message]) -> None + """Distribute received messages to all subscribed sockets.""" for sock in self.sockets: with sock.lock: for msg in msgs: @@ -132,9 +160,17 @@ def multiplex_rx_packets(self): # this object is singleton and all python-CAN sockets are using # the same instance and locking the same locks. return + # Snapshot pool entries under the lock, then read from each bus + # WITHOUT holding pool_mutex. On slow serial interfaces (slcan) + # bus.recv(timeout=0) can take ~2-3ms per frame; holding the + # mutex during those reads would block send() for the entire + # duration. with self.pool_mutex: - for t in self.pool.values(): - t.mux() + mappers = list(self.pool.values()) + for mapper in mappers: + msgs = mapper.read_bus() + if msgs: + mapper.distribute(msgs) self.last_call = time.monotonic() def register(self, socket, *args, **kwargs): @@ -161,13 +197,36 @@ def register(self, socket, *args, **kwargs): if k in self.pool: t = self.pool[k] t.sockets.append(socket) - filters = [s.filters for s in t.sockets - if s.filters is not None] - if filters: - t.bus.set_filters(reduce(add, filters)) + # Update bus-level filters to the union of all sockets' + # filters. For non-slcan interfaces (socketcan, kvaser, + # vector), this enables efficient hardware/kernel + # filtering. For slcan, the bus filters were already + # cleared on creation, so this is a no-op (all sockets + # on slcan share the unfiltered bus). + if not k.lower().startswith('slcan'): + filters = [s.filters for s in t.sockets + if s.filters is not None] + if filters: + t.bus.set_filters(reduce(add, filters)) socket.name = k else: bus = can_Bus(*args, **kwargs) + # Serial interfaces like slcan only do software + # filtering inside BusABC.recv(): the recv loop reads + # one frame, finds it doesn't match, and returns + # None -- silently consuming serial bandwidth without + # returning the frame to the mux. This starves the + # mux on busy buses. + # + # For slcan, clear the filters from the bus so that + # bus.recv() returns ALL frames. Per-socket filtering + # in distribute() via _matches_filters() handles + # delivery. Other interfaces (socketcan, kvaser, + # vector, candle) perform efficient hardware/kernel + # filtering and should keep their bus-level filters. + if kwargs.get('can_filters') and \ + k.lower().startswith('slcan'): + bus.set_filters(None) socket.name = k self.pool[k] = SocketMapper(bus, [socket]) @@ -188,6 +247,7 @@ def unregister(self, socket): t = self.pool[socket.name] t.sockets.remove(socket) if not t.sockets: + t.closing = True t.bus.shutdown() del self.pool[socket.name] except KeyError: @@ -322,6 +382,7 @@ def select(sockets, remain=conf.recv_poll_rate): :returns: an array of sockets that were selected and the function to be called next to get the packets (i.g. recv) """ + SocketsPool.multiplex_rx_packets() ready_sockets = \ [s for s in sockets if isinstance(s, PythonCANSocket) and len(s.can_iface.rx_queue)] @@ -333,7 +394,6 @@ def select(sockets, remain=conf.recv_poll_rate): # yield this thread to avoid starvation time.sleep(0) - SocketsPool.multiplex_rx_packets() return cast(List[SuperSocket], ready_sockets) def close(self): diff --git a/scapy/contrib/isotp/isotp_soft_socket.py b/scapy/contrib/isotp/isotp_soft_socket.py index 4182d445336..e6baa1dbbf7 100644 --- a/scapy/contrib/isotp/isotp_soft_socket.py +++ b/scapy/contrib/isotp/isotp_soft_socket.py @@ -166,7 +166,8 @@ def __init__(self, def close(self): # type: () -> None if not self.closed: - self.impl.close() + if hasattr(self, "impl"): + self.impl.close() self.closed = True def failure_analysis(self): @@ -202,8 +203,8 @@ def recv(self, x=0xffff, **kwargs): return msg @staticmethod - def select(sockets, remain=None): - # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] + def select(sockets, remain=None): # type: ignore[override] + # type: (List[Union[SuperSocket, ObjectPipe[Any]]], Optional[float]) -> List[Union[SuperSocket, ObjectPipe[Any]]] # noqa: E501 """This function is called during sendrecv() routine to wait for sockets to be ready to receive """ @@ -214,8 +215,12 @@ def select(sockets, remain=None): ready_pipes = select_objects(obj_pipes, remain) - return [x for x in sockets if isinstance(x, ISOTPSoftSocket) and - not x.closed and x.impl.rx_queue in ready_pipes] + result: List[Union[SuperSocket, ObjectPipe[Any]]] = [ + x for x in sockets if isinstance(x, ISOTPSoftSocket) and + not x.closed and x.impl.rx_queue in ready_pipes] + result += [x for x in sockets if isinstance(x, ObjectPipe) and + x in ready_pipes] + return result class TimeoutScheduler: @@ -251,6 +256,7 @@ def schedule(cls, timeout, callback): # Start the scheduling thread if it is not started already if cls._thread is None: t = Thread(target=cls._task, name="TimeoutScheduler._task") + t.daemon = True must_interrupt = False cls._thread = t cls._event.clear() @@ -550,6 +556,7 @@ def __init__(self, self.tx_handle = TimeoutScheduler.schedule( self.rx_tx_poll_rate, self._send) self.last_rx_call = 0.0 + self.rx_start_time = 0.0 def failure_analysis(self): # type: () -> None @@ -592,12 +599,26 @@ def _get_padding_size(pl_size): def can_recv(self): # type: () -> None self.last_rx_call = TimeoutScheduler._time() - if self.can_socket.select([self.can_socket], 0): - pkt = self.can_socket.recv() - if pkt: - self.on_can_recv(pkt) + try: + while self.can_socket.select([self.can_socket], 0): + pkt = self.can_socket.recv() + if pkt: + self.on_can_recv(pkt) + else: + break + except Exception: + if not self.closed: + log_isotp.warning("Error in can_recv: %s", + traceback.format_exc()) if not self.closed and not self.can_socket.closed: - if self.can_socket.select([self.can_socket], 0): + # Determine poll_time from ISOTP state only. + # Avoid calling select() here — on slow serial interfaces + # (slcan), each select() triggers a mux() call that reads + # N frames at ~2.5ms each, wasting time that could be spent + # processing frames already in the rx_queue. + if self.rx_state == ISOTP_WAIT_DATA or \ + self.tx_state == ISOTP_WAIT_FC or \ + self.tx_state == ISOTP_WAIT_FIRST_FC: poll_time = 0.0 else: poll_time = self.rx_tx_poll_rate @@ -643,13 +664,46 @@ def close(self): self.tx_handle.cancel() except Scapy_Exception: pass + if self.rx_timeout_handle is not None: + try: + self.rx_timeout_handle.cancel() + except Scapy_Exception: + pass + if self.tx_timeout_handle is not None: + try: + self.tx_timeout_handle.cancel() + except Scapy_Exception: + pass + try: + self.rx_queue.close() + except (OSError, EOFError): + pass + try: + self.tx_queue.close() + except (OSError, EOFError): + pass def _rx_timer_handler(self): # type: () -> None """Method called every time the rx_timer times out, due to the peer not sending a consecutive frame within the expected time window""" + if self.closed: + return + if self.rx_state == ISOTP_WAIT_DATA: + # On slow serial interfaces (slcan), the mux reads frames + # from an OS serial buffer that may contain hundreds of + # background CAN frames. Consecutive Frames from the ECU + # are queued behind this backlog and can take several + # seconds to reach the ISOTP state machine. Extend the + # timeout up to 10 × cf_timeout to give the mux enough + # time to drain the backlog. + total_wait = TimeoutScheduler._time() - self.rx_start_time + if total_wait < self.cf_timeout * 10: + self.rx_timeout_handle = TimeoutScheduler.schedule( + self.cf_timeout, self._rx_timer_handler) + return # we did not get new data frames in time. # reset rx state self.rx_state = ISOTP_IDLE @@ -662,6 +716,9 @@ def _tx_timer_handler(self): two situations: either a Flow Control frame was not received in time, or the Separation Time Min is expired and a new frame must be sent.""" + if self.closed: + return + if (self.tx_state == ISOTP_WAIT_FC or self.tx_state == ISOTP_WAIT_FIRST_FC): # we did not get any flow control frame in time @@ -866,6 +923,7 @@ def _recv_ff(self, data, ts): # initial setup for this pdu reception self.rx_sn = 1 self.rx_state = ISOTP_WAIT_DATA + self.rx_start_time = TimeoutScheduler._time() # no creation of flow control frames if not self.listen_only: @@ -994,11 +1052,16 @@ def begin_send(self, x): def _send(self): # type: () -> None - if self.tx_state == ISOTP_IDLE: - if select_objects([self.tx_queue], 0): - pkt = self.tx_queue.recv() - if pkt: - self.begin_send(pkt) + try: + if self.tx_state == ISOTP_IDLE: + if select_objects([self.tx_queue], 0): + pkt = self.tx_queue.recv() + if pkt: + self.begin_send(pkt) + except Exception: + if not self.closed: + log_isotp.warning("Error in _send: %s", + traceback.format_exc()) if not self.closed: self.tx_handle = TimeoutScheduler.schedule( diff --git a/test/contrib/isotp_soft_socket.uts b/test/contrib/isotp_soft_socket.uts index 2e7bdfaeccf..f112563d62a 100644 --- a/test/contrib/isotp_soft_socket.uts +++ b/test/contrib/isotp_soft_socket.uts @@ -10,7 +10,7 @@ from io import BytesIO from scapy.layers.can import * from scapy.contrib.isotp import * from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler -from test.testsocket import TestSocket, cleanup_testsockets +from test.testsocket import TestSocket, SlowTestSocket, cleanup_testsockets with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: exec(f.read()) @@ -954,6 +954,204 @@ with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as s assert rx2 is None += ISOTPSoftSocket select returns control ObjectPipe + +from scapy.automaton import ObjectPipe as _ObjectPipe + +close_pipe = _ObjectPipe("control_socket") +close_pipe.send(None) + +with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x123, 0x321) as sock: + result = ISOTPSoftSocket.select([sock, close_pipe], remain=0) + +assert close_pipe in result + +close_pipe.close() + += ISOTPSoftSocket select returns control ObjectPipe alongside ready rx_queue + +from scapy.automaton import ObjectPipe as _ObjectPipe + +close_pipe = _ObjectPipe("control_socket") +close_pipe.send(None) + +with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x641, 0x241) as sock: + sock.impl.rx_queue.send((b'\x62\xF1\x90\x41\x42\x43', 0.0)) + result = ISOTPSoftSocket.select([sock, close_pipe], remain=0) + +assert close_pipe in result +assert sock in result + +close_pipe.close() + += ISOTPSoftSocket sr1 SF request with MF response threaded + +from threading import Thread + +request = ISOTP(b'\x22\xF1\x90') +response_data = b'\x62\xF1\x90' + b'\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4A\x4B\x4C\x4D\x4E\x4F\x50' +response_msg = ISOTP(response_data) + +with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x641, 0x241) as sock_tx, \ + TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x241, 0x641) as sock_rx: + isocan_rx.pair(isocan_tx) + def responder(): + sniffed = sock_rx.sniff(count=1, timeout=5) + if sniffed: + sock_rx.send(response_msg) + resp_thread = Thread(target=responder, daemon=True) + resp_thread.start() + time.sleep(0.1) + rx = sock_tx.sr1(request, timeout=5, verbose=False, threaded=True) + resp_thread.join(timeout=5) + assert not resp_thread.is_alive(), "resp_thread still alive" + # Stop TimeoutScheduler while sockets are still open to avoid + # callbacks crashing on closed sockets and writing to stderr. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) + +assert rx is not None +assert rx.data == response_data + += ISOTPSoftSocket sr1 timeout with threaded=True + +from threading import Thread, Event +msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') + +with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \ + TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx: + isocan_rx.pair(isocan_tx) + start = time.time() + rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True) + elapsed = time.time() - start + # Stop TimeoutScheduler while sockets are still open. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) + +assert rx2 is None +assert elapsed < 5 + += ISOTPSoftSocket sr1 timeout with threaded=True and background traffic + +from threading import Thread, Event +msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') + +with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \ + TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx: + isocan_rx.pair(isocan_tx) + stop_traffic = Event() + def bg_traffic(): + while not stop_traffic.is_set(): + try: + isocan_rx.send(CAN(identifier=0x456, data=dhex("01 02 03"))) + except Exception: + break + time.sleep(0.01) + traffic_thread = Thread(target=bg_traffic, daemon=True) + traffic_thread.start() + start = time.time() + rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True) + elapsed = time.time() - start + stop_traffic.set() + traffic_thread.join(timeout=5) + assert not traffic_thread.is_alive(), "traffic_thread still alive" + # Stop TimeoutScheduler while sockets are still open. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) + +assert rx2 is None +assert elapsed < 5 + += ISOTPSoftSocket sr1 SF request with MF response threaded and background traffic on slow interface + +from threading import Thread, Event + +response_data = b'\x62\xF1\x90' + b'\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4A\x4B\x4C\x4D\x4E\x4F\x50' + +stim = TestSocket(CAN) +isocan = TestSocket(CAN) +stim.pair(isocan) + +bg_frame = CAN(identifier=0x456, data=dhex("01 02 03")) +ff_frame = CAN(identifier=0x241, data=dhex("10 13 62 F1 90 41 42 43")) +cf1_frame = CAN(identifier=0x241, data=dhex("21 44 45 46 47 48 49 4A")) +cf2_frame = CAN(identifier=0x241, data=dhex("22 4B 4C 4D 4E 4F 50 00")) + +bg_count = 2000 # Large number of frames to stress the ISOTPSoftSocket implementation + +for _ in range(100): + _ = stim.send(bg_frame) + +stim.send(ff_frame) + +for _ in range(bg_count): + _ = stim.send(bg_frame) + +stim.send(cf1_frame) +stim.send(cf2_frame) + +with isocan, stim, ISOTPSoftSocket(isocan, 0x641, 0x241) as sock: + pkts = sock.sniff(count=1, timeout=10) + # Stop TimeoutScheduler while sockets are still open. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) + +assert len(pkts) == 1, "MF response not received due to background traffic" +assert pkts[0].data == response_data + += ISOTPSoftSocket MF response with delayed CFs and background traffic + +from threading import Thread, Event + +response_data = b'\x62\xF1\x90' + b'\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4A\x4B\x4C\x4D\x4E\x4F\x50' + +with TestSocket(CAN) as stim, TestSocket(CAN) as isocan, \ + ISOTPSoftSocket(isocan, 0x641, 0x241) as sock: + stim.pair(isocan) + stop_traffic = Event() + def bg_traffic(): + bg_frame = CAN(identifier=0x456, data=dhex("01 02 03")) + while not stop_traffic.is_set(): + try: + stim.send(bg_frame) + except Exception: + break + time.sleep(0.001) + def delayed_response(): + time.sleep(0.05) + sock.impl.rx_tx_poll_rate = 10 + stim.send(CAN(identifier=0x241, data=dhex("10 13 62 F1 90 41 42 43"))) + time.sleep(0.01) + stim.send(CAN(identifier=0x241, data=dhex("21 44 45 46 47 48 49 4A"))) + time.sleep(0.01) + stim.send(CAN(identifier=0x241, data=dhex("22 4B 4C 4D 4E 4F 50 00"))) + traffic_thread = Thread(target=bg_traffic) + traffic_thread.start() + resp_thread = Thread(target=delayed_response) + resp_thread.start() + pkts = sock.sniff(count=1, timeout=5) + stop_traffic.set() + traffic_thread.join(timeout=5) + resp_thread.join(timeout=5) + assert not traffic_thread.is_alive(), "traffic_thread still alive" + assert not resp_thread.is_alive(), "resp_thread still alive" + # Stop TimeoutScheduler while sockets are still open. + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) + +assert len(pkts) == 1, "MF response not received with delayed CFs and slow poll rate" +assert pkts[0].data == response_data + = ISOTPSoftSocket sniff msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') @@ -1261,12 +1459,214 @@ with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241 res.show() assert res.data == dhex("01 02 03 04 05 06 07 08 09") + ++ MF response via sr1() cartesian product tests +# Background traffic from pcap: 3 periodic IDs (0x062, 0x024, 0x039) every +# 10ms, plus a burst of 9 additional IDs every 100ms. +# ECU response latency: ~0.6ms after SF request (from pcap frame 385). +# CF timing after FC: CF1 +8ms, CF2 +10ms, CF3 +10ms (from pcap). +# Expected ISOTP data: "62 00 01 flag{UDS_DATA_READ}" (22 bytes). +# +# Cartesian product dimensions: +# threaded: {False, True} - sr1() threading mode +# can_filters: {[0x7eb], None} - per-socket filtering vs. no filtering +# adapter: {limited (slcan-like), unlimited (candle-like)} +# +# slcan model parameters (from real hardware testing): +# frame_delay=0.0025: ~2.5ms per serial read at 115200 baud +# serial_timeout=0.1: python-can slcan Serial(timeout=0.1) blocks 100ms +# when serial buffer is empty +# read_time_limit=0.02: SocketMapper.READ_BUS_TIME_LIMIT = 20ms caps +# total read time per mux call +# prefill_frames=200: OS serial buffer backlog from busy CAN bus +# +# All tests use retry=0, timeout=1.0. All should PASS with the fix +# (can_filters stripped from raw Bus, per-socket filtering in mux, +# read_bus time-limited to avoid TimeoutScheduler thread starvation). + += MF response helper setup for cartesian product tests + +from threading import Thread, Event + +def run_mf_response_test(frame_delay, mux_throttle, filters_kwarg, threaded, + prefill_frames=0, serial_timeout=0.0, + read_time_limit=0.0, interface_name="slcan"): + import time as _time + from threading import Thread as _Thread, Event as _Event + from scapy.layers.can import CAN as _CAN + from scapy.contrib.isotp import ISOTP as _ISOTP + from scapy.contrib.isotp.isotp_soft_socket import ISOTPSoftSocket as _ISOTPSoftSocket + from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler as _TimeoutScheduler + from test.testsocket import TestSocket as _TestSocket, SlowTestSocket as _SlowTestSocket + _dhex = bytes.fromhex + response_data = _dhex("620001666c61677b5544535f444154415f524541447d") + bg_periodic = [0x062, 0x024, 0x039] + bg_burst = [0x1d3, 0x024, 0x039, 0x077, 0x098, 0x150, 0x1a7, 0x1b8, 0x1bb] + if frame_delay > 0: + sock_cls = _SlowTestSocket + sock_kwargs = dict(frame_delay=frame_delay, mux_throttle=mux_throttle, + serial_timeout=serial_timeout, + read_time_limit=read_time_limit, + interface_name=interface_name, + **filters_kwarg) + else: + sock_cls = _TestSocket + sock_kwargs = {} + with sock_cls(_CAN, **sock_kwargs) as isocan, \ + _TestSocket(_CAN) as ecu_mon, \ + _ISOTPSoftSocket(isocan, tx_id=0x7e3, rx_id=0x7eb) as sock: + with _TestSocket(_CAN) as stim: + stim.pair(isocan) + isocan.pair(ecu_mon) + # Pre-fill the serial buffer with background frames to + # simulate a real slcan adapter that has been connected to + # a busy CAN bus. On real hardware the OS serial buffer + # accumulates hundreds of frames before the ISOTP exchange. + for _ in range(prefill_frames): + bid = bg_periodic[_ % len(bg_periodic)] + stim.send(_CAN(identifier=bid, data=bytes(8))) + fc_received = _Event() + stop = _Event() + bg_cycle = [0] + def bg_generator(): + while not stop.is_set(): + for bid in bg_periodic: + if stop.is_set(): + return + stim.send(_CAN(identifier=bid, data=bytes(8))) + bg_cycle[0] += 1 + if bg_cycle[0] % 10 == 0: + for bid in bg_burst: + if stop.is_set(): + return + stim.send(_CAN(identifier=bid, data=bytes(8))) + _time.sleep(0.010) + def ecu_simulation(): + _time.sleep(0.05) + stim.send(_CAN(identifier=0x7eb, data=_dhex("1016620001666c61"))) + fc_received.wait(timeout=10.0) + if not fc_received.is_set(): + return + _time.sleep(0.008) + stim.send(_CAN(identifier=0x7eb, data=_dhex("21677b5544535f44"))) + _time.sleep(0.010) + stim.send(_CAN(identifier=0x7eb, data=_dhex("224154415f524541"))) + _time.sleep(0.010) + stim.send(_CAN(identifier=0x7eb, data=_dhex("23447d"))) + def fc_watcher(): + while not stop.is_set(): + if _TestSocket.select([ecu_mon], 0.1): + pkt = ecu_mon.recv() + if pkt is not None and pkt.identifier == 0x7e3 and \ + len(pkt.data) >= 1 and bytes(pkt.data)[0] == 0x30: + fc_received.set() + return + bg_thread = _Thread(target=bg_generator) + ecu_thread = _Thread(target=ecu_simulation) + fc_thread = _Thread(target=fc_watcher) + bg_thread.start() + ecu_thread.start() + fc_thread.start() + result = sock.sr1(_ISOTP(data=_dhex("220001")), + retry=0, timeout=10.0, + threaded=threaded, verbose=0) + stop.set() + fc_received.set() + bg_thread.join(timeout=5) + ecu_thread.join(timeout=5) + fc_thread.join(timeout=5) + assert not bg_thread.is_alive(), "bg_thread still alive" + assert not ecu_thread.is_alive(), "ecu_thread still alive" + assert not fc_thread.is_alive(), "fc_thread still alive" + # Stop TimeoutScheduler while sockets are still open to + # avoid callbacks crashing on closed sockets and writing + # to stderr (causes fatal error on Python 3.13 Windows). + _ts_thread = _TimeoutScheduler._thread + _TimeoutScheduler.clear() + if _ts_thread is not None: + _ts_thread.join(timeout=5) + return result, response_data + += MF response: candle-like unlimited, no can_filters, threaded=False + +result, expected = run_mf_response_test( + frame_delay=0, mux_throttle=0, + filters_kwarg={}, threaded=False, interface_name="candle") +assert result is not None, "MF response not received (candle, no filters, threaded=False)" +assert result.data == expected + += MF response: candle-like unlimited, no can_filters, threaded=True + +result, expected = run_mf_response_test( + frame_delay=0, mux_throttle=0, + filters_kwarg={}, threaded=True, interface_name="candle") +assert result is not None, "MF response not received (candle, no filters, threaded=True)" +assert result.data == expected + += MF response: candle-like unlimited, can_filters=[0x7eb], threaded=False + +result, expected = run_mf_response_test( + frame_delay=0, mux_throttle=0, + filters_kwarg=dict(can_filters=[0x7eb]), threaded=False, + interface_name="candle") +assert result is not None, "MF response not received (candle, can_filters, threaded=False)" +assert result.data == expected + += MF response: candle-like unlimited, can_filters=[0x7eb], threaded=True + +result, expected = run_mf_response_test( + frame_delay=0, mux_throttle=0, + filters_kwarg=dict(can_filters=[0x7eb]), threaded=True, + interface_name="candle") +assert result is not None, "MF response not received (candle, can_filters, threaded=True)" +assert result.data == expected + += MF response: slcan-like limited, no can_filters, threaded=False + +result, expected = run_mf_response_test( + frame_delay=0.0025, mux_throttle=0.001, serial_timeout=0.1, + read_time_limit=0.02, filters_kwarg={}, threaded=False, + prefill_frames=200) +assert result is not None, "MF response not received (slcan, no filters, threaded=False)" +assert result.data == expected + += MF response: slcan-like limited, no can_filters, threaded=True + +result, expected = run_mf_response_test( + frame_delay=0.0025, mux_throttle=0.001, serial_timeout=0.1, + read_time_limit=0.02, filters_kwarg={}, threaded=True, + prefill_frames=200) +assert result is not None, "MF response not received (slcan, no filters, threaded=True)" +assert result.data == expected + += MF response: slcan-like limited, can_filters=[0x7eb], threaded=False + +result, expected = run_mf_response_test( + frame_delay=0.0025, mux_throttle=0.001, serial_timeout=0.1, + read_time_limit=0.02, filters_kwarg=dict(can_filters=[0x7eb]), + threaded=False, prefill_frames=200) +assert result is not None, "MF response not received (slcan, can_filters, threaded=False)" +assert result.data == expected + += MF response: slcan-like limited, can_filters=[0x7eb], threaded=True + +result, expected = run_mf_response_test( + frame_delay=0.0025, mux_throttle=0.001, serial_timeout=0.1, + read_time_limit=0.02, filters_kwarg=dict(can_filters=[0x7eb]), + threaded=True, prefill_frames=200) +assert result is not None, "MF response not received (slcan, can_filters, threaded=True)" +assert result.data == expected + + + Cleanup = Delete testsockets cleanup_testsockets() +_ts = TimeoutScheduler._thread TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) log_runtime.removeHandler(handler) diff --git a/test/testsocket.py b/test/testsocket.py index d1a90a4da51..1ecd79f4fec 100644 --- a/test/testsocket.py +++ b/test/testsocket.py @@ -178,6 +178,172 @@ def recv(self, x=MTU, **kwargs): return super(UnstableSocket, self).recv(x, **kwargs) +class SlowTestSocket(TestSocket): + """A TestSocket that simulates the mux/throttle behavior of + PythonCANSocket on a slow serial interface (like slcan). + + Frames sent to this socket go into an intermediate serial buffer. + They only become visible to recv()/select() after mux() moves + them to the rx ObjectPipe. + + Key parameters model the real slcan timing bottleneck: + - frame_delay: per-frame serial read time (~2-3ms on real slcan) + - serial_timeout: blocking wait when serial buffer is empty. + Real python-can slcan uses serial.Serial(timeout=0.1), so + bus.recv(timeout=0) blocks for 100ms when buffer is empty. + - read_time_limit: max time spent reading per mux call, matching + SocketMapper.READ_BUS_TIME_LIMIT in production code. + + can_filters: Optional list of CAN identifiers for per-socket + filtering. When set, mux reads all frames but only delivers + matching ones, like SocketMapper.distribute() + _matches_filters. + """ + + def __init__(self, basecls=None, frame_delay=0.0002, + mux_throttle=0.001, can_filters=None, + serial_timeout=0.0, read_time_limit=0.0, + interface_name="slcan"): + # type: (Optional[Type[Packet]], float, float, Optional[List[int]], float, float, str) -> None # noqa: E501 + """ + :param frame_delay: Simulated per-frame serial read time (seconds). + :param mux_throttle: Minimum time between mux calls (default 1ms). + :param can_filters: Optional list of CAN identifiers for filtering. + :param serial_timeout: Time to block when serial buffer is empty + (models python-can slcan serial.Serial(timeout=0.1)). + Set to 0.1 to reproduce real slcan behavior. + :param read_time_limit: Max time per mux read pass (seconds). + Set to 0.02 to match SocketMapper.READ_BUS_TIME_LIMIT. + When 0 (default), no time limit is applied. + :param interface_name: Simulated interface name (default "slcan"). + Used in test descriptions to identify the adapter type. + """ + super(SlowTestSocket, self).__init__(basecls) + self.interface_name = interface_name + from collections import deque + self._serial_buffer = deque() # type: deque[bytes] + self._serial_lock = Lock() + self._last_mux = 0.0 + self._frame_delay = frame_delay + self._mux_throttle = mux_throttle + self._can_filters = can_filters + self._serial_timeout = serial_timeout + self._read_time_limit = read_time_limit + self._real_ins = self.ins + self.ins = _SlowPipeWrapper(self) # type: ignore[assignment] + + @staticmethod + def _extract_can_id(frame): + # type: (bytes) -> int + """Extract CAN identifier from raw CAN frame bytes.""" + import struct + if len(frame) < 4: + return -1 + return int(struct.unpack('!I', frame[:4])[0] & 0x1FFFFFFF) + + def _mux(self): + # type: () -> None + """Move frames from serial buffer to rx ObjectPipe. + + Models the real PythonCANSocket read path: + 1. read_bus(): loop calling bus.recv(timeout=0) — each call + takes frame_delay when data is available, or serial_timeout + when the buffer is empty (modeling slcan serial timeout). + 2. distribute(): deliver matching frames to the ObjectPipe. + + With read_time_limit > 0, the read loop stops after that many + seconds (matching SocketMapper.READ_BUS_TIME_LIMIT). + """ + now = time.monotonic() + if now - self._last_mux < self._mux_throttle: + return + + # Phase 1: read_bus — read frames from serial buffer + msgs = [] + deadline = time.monotonic() + self._read_time_limit \ + if self._read_time_limit > 0 else None + while True: + if self.closed: + break + with self._serial_lock: + if self._serial_buffer: + frame = self._serial_buffer.popleft() + else: + frame = None + if frame is None: + # Empty buffer: model the serial timeout blocking + if self._serial_timeout > 0: + time.sleep(self._serial_timeout) + break + if self._frame_delay > 0: + time.sleep(self._frame_delay) + msgs.append(frame) + if deadline and time.monotonic() >= deadline: + break + + # Phase 2: distribute — apply per-socket filtering + for frame in msgs: + if self._can_filters is not None: + can_id = self._extract_can_id(frame) + if can_id not in self._can_filters: + continue + self._real_ins.send(frame) + + self._last_mux = time.monotonic() + + def recv_raw(self, x=MTU): + # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] # noqa: E501 + """Read from the rx ObjectPipe (populated by mux via select).""" + return self.basecls, self._real_ins.recv(0), time.time() + + def send(self, x): + # type: (Packet) -> int + if self._frame_delay > 0: + time.sleep(self._frame_delay) + return super(SlowTestSocket, self).send(x) + + @staticmethod + def select(sockets, remain=conf.recv_poll_rate): + # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] + for s in sockets: + if isinstance(s, SlowTestSocket): + s._mux() + return select_objects(sockets, remain) + + def close(self): + # type: () -> None + self.ins = self._real_ins + super(SlowTestSocket, self).close() + + +class _SlowPipeWrapper: + """Wrapper that intercepts send() to route into serial buffer.""" + def __init__(self, owner): + # type: (SlowTestSocket) -> None + self._owner = owner + + def send(self, data): + # type: (bytes) -> None + with self._owner._serial_lock: + self._owner._serial_buffer.append(data) + + def recv(self, timeout=0): + # type: (int) -> Optional[bytes] + return self._owner._real_ins.recv(timeout) + + def fileno(self): + # type: () -> int + return self._owner._real_ins.fileno() + + def close(self): + # type: () -> None + self._owner._real_ins.close() + + @property + def closed(self): + # type: () -> bool + return bool(self._owner._real_ins.closed) # type: ignore[attr-defined] + + def cleanup_testsockets(): # type: () -> None """