diff --git a/scapy/contrib/isotp/isotp_soft_socket.py b/scapy/contrib/isotp/isotp_soft_socket.py index 4182d445336..5cfb1689308 100644 --- a/scapy/contrib/isotp/isotp_soft_socket.py +++ b/scapy/contrib/isotp/isotp_soft_socket.py @@ -166,7 +166,8 @@ def __init__(self, def close(self): # type: () -> None if not self.closed: - self.impl.close() + if hasattr(self, "impl"): + self.impl.close() self.closed = True def failure_analysis(self): @@ -202,8 +203,8 @@ def recv(self, x=0xffff, **kwargs): return msg @staticmethod - def select(sockets, remain=None): - # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] + def select(sockets, remain=None): # type: ignore[override] + # type: (List[Union[SuperSocket, ObjectPipe[Any]]], Optional[float]) -> List[Union[SuperSocket, ObjectPipe[Any]]] # noqa: E501 """This function is called during sendrecv() routine to wait for sockets to be ready to receive """ @@ -214,8 +215,12 @@ def select(sockets, remain=None): ready_pipes = select_objects(obj_pipes, remain) - return [x for x in sockets if isinstance(x, ISOTPSoftSocket) and - not x.closed and x.impl.rx_queue in ready_pipes] + result: List[Union[SuperSocket, ObjectPipe[Any]]] = [ + x for x in sockets if isinstance(x, ISOTPSoftSocket) and + not x.closed and x.impl.rx_queue in ready_pipes] + result += [x for x in sockets if isinstance(x, ObjectPipe) and + x in ready_pipes] + return result class TimeoutScheduler: diff --git a/test/contrib/isotp_soft_socket.uts b/test/contrib/isotp_soft_socket.uts index 2e7bdfaeccf..0f0a3992fda 100644 --- a/test/contrib/isotp_soft_socket.uts +++ b/test/contrib/isotp_soft_socket.uts @@ -954,6 +954,102 @@ with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as s assert rx2 is None += ISOTPSoftSocket select returns control ObjectPipe + +from scapy.automaton import ObjectPipe as _ObjectPipe + +close_pipe = _ObjectPipe("control_socket") +close_pipe.send(None) + +with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x123, 0x321) as sock: + result = ISOTPSoftSocket.select([sock, close_pipe], remain=0) + +assert close_pipe in result + +close_pipe.close() + += ISOTPSoftSocket select returns control ObjectPipe alongside ready rx_queue + +from scapy.automaton import ObjectPipe as _ObjectPipe + +close_pipe = _ObjectPipe("control_socket") +close_pipe.send(None) + +with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x641, 0x241) as sock: + sock.impl.rx_queue.send((b'\x62\xF1\x90\x41\x42\x43', 0.0)) + result = ISOTPSoftSocket.select([sock, close_pipe], remain=0) + +assert close_pipe in result +assert sock in result + +close_pipe.close() + += ISOTPSoftSocket sr1 SF request with MF response threaded + +from threading import Thread + +request = ISOTP(b'\x22\xF1\x90') +response_data = b'\x62\xF1\x90' + b'\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4A\x4B\x4C\x4D\x4E\x4F\x50' +response_msg = ISOTP(response_data) + +with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x641, 0x241) as sock_tx, \ + TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x241, 0x641) as sock_rx: + isocan_rx.pair(isocan_tx) + def responder(): + sniffed = sock_rx.sniff(count=1, timeout=5) + if sniffed: + sock_rx.send(response_msg) + resp_thread = Thread(target=responder, daemon=True) + resp_thread.start() + time.sleep(0.1) + rx = sock_tx.sr1(request, timeout=5, verbose=False, threaded=True) + resp_thread.join(timeout=3) + +assert rx is not None +assert rx.data == response_data + += ISOTPSoftSocket sr1 timeout with threaded=True + +from threading import Thread, Event +msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') + +with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \ + TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx: + isocan_rx.pair(isocan_tx) + start = time.time() + rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True) + elapsed = time.time() - start + +assert rx2 is None +assert elapsed < 5 + += ISOTPSoftSocket sr1 timeout with threaded=True and background traffic + +from threading import Thread, Event +msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33') + +with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \ + TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx: + isocan_rx.pair(isocan_tx) + stop_traffic = Event() + def bg_traffic(): + while not stop_traffic.is_set(): + try: + isocan_rx.send(CAN(identifier=0x456, data=dhex("01 02 03"))) + except Exception: + break + time.sleep(0.01) + traffic_thread = Thread(target=bg_traffic, daemon=True) + traffic_thread.start() + start = time.time() + rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True) + elapsed = time.time() - start + stop_traffic.set() + traffic_thread.join(timeout=2) + +assert rx2 is None +assert elapsed < 5 + = ISOTPSoftSocket sniff msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')