Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 75 additions & 15 deletions scapy/contrib/cansocket_python_can.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from functools import reduce
from operator import add

from collections import deque

from scapy.config import conf
Expand Down Expand Up @@ -55,24 +56,51 @@ def __init__(self, bus, sockets):
"""
self.bus = bus
self.sockets = sockets

def mux(self):
# type: () -> None
"""Multiplexer function. Tries to receive from its python-can bus
object. If a message is received, this message gets forwarded to
all receive queues of the SocketWrapper objects.
self.closing = False

# Maximum time (seconds) to spend reading frames in one read_bus()
# call. On serial interfaces (slcan) the final bus.recv(timeout=0)
# when the buffer is empty blocks for the serial port's read timeout
# (typically 100ms in python-can's slcan driver). During that block
# the TimeoutScheduler thread cannot run any other callbacks. By
# capping total read time, we ensure the scheduler stays responsive
# even on slow serial interfaces with heavy background traffic.
READ_BUS_TIME_LIMIT = 0.020 # 20 ms

def read_bus(self):
# type: () -> List[can_Message]
"""Read available frames from the bus, up to READ_BUS_TIME_LIMIT.

On slow serial interfaces (slcan), bus.recv(timeout=0) can
block for ~100ms when the serial buffer is empty (python-can's
slcan serial timeout). This method limits total time spent
reading so the TimeoutScheduler thread stays responsive.

This method intentionally does NOT hold pool_mutex so that
concurrent send() calls are not blocked during the serial I/O.
"""
if self.closing:
return []
msgs = []
deadline = time.monotonic() + self.READ_BUS_TIME_LIMIT
while True:
try:
msg = self.bus.recv(timeout=0)
if msg is None:
break
else:
msgs.append(msg)
if time.monotonic() >= deadline:
break
except Exception as e:
warning("[MUX] python-can exception caught: %s" % e)

if not self.closing:
warning("[MUX] python-can exception caught: %s" % e)
break
return msgs

def distribute(self, msgs):
# type: (List[can_Message]) -> None
"""Distribute received messages to all subscribed sockets."""
for sock in self.sockets:
with sock.lock:
for msg in msgs:
Expand Down Expand Up @@ -132,9 +160,17 @@ def multiplex_rx_packets(self):
# this object is singleton and all python-CAN sockets are using
# the same instance and locking the same locks.
return
# Snapshot pool entries under the lock, then read from each bus
# WITHOUT holding pool_mutex. On slow serial interfaces (slcan)
# bus.recv(timeout=0) can take ~2-3ms per frame; holding the
# mutex during those reads would block send() for the entire
# duration.
with self.pool_mutex:
for t in self.pool.values():
t.mux()
mappers = list(self.pool.values())
for mapper in mappers:
msgs = mapper.read_bus()
if msgs:
mapper.distribute(msgs)
self.last_call = time.monotonic()

def register(self, socket, *args, **kwargs):
Expand All @@ -161,13 +197,36 @@ def register(self, socket, *args, **kwargs):
if k in self.pool:
t = self.pool[k]
t.sockets.append(socket)
filters = [s.filters for s in t.sockets
if s.filters is not None]
if filters:
t.bus.set_filters(reduce(add, filters))
# Update bus-level filters to the union of all sockets'
# filters. For non-slcan interfaces (socketcan, kvaser,
# vector), this enables efficient hardware/kernel
# filtering. For slcan, the bus filters were already
# cleared on creation, so this is a no-op (all sockets
# on slcan share the unfiltered bus).
if not k.lower().startswith('slcan'):
filters = [s.filters for s in t.sockets
if s.filters is not None]
if filters:
t.bus.set_filters(reduce(add, filters))
socket.name = k
else:
bus = can_Bus(*args, **kwargs)
# Serial interfaces like slcan only do software
# filtering inside BusABC.recv(): the recv loop reads
# one frame, finds it doesn't match, and returns
# None -- silently consuming serial bandwidth without
# returning the frame to the mux. This starves the
# mux on busy buses.
#
# For slcan, clear the filters from the bus so that
# bus.recv() returns ALL frames. Per-socket filtering
# in distribute() via _matches_filters() handles
# delivery. Other interfaces (socketcan, kvaser,
# vector, candle) perform efficient hardware/kernel
# filtering and should keep their bus-level filters.
if kwargs.get('can_filters') and \
k.lower().startswith('slcan'):
bus.set_filters(None)
socket.name = k
self.pool[k] = SocketMapper(bus, [socket])

Expand All @@ -188,6 +247,7 @@ def unregister(self, socket):
t = self.pool[socket.name]
t.sockets.remove(socket)
if not t.sockets:
t.closing = True
t.bus.shutdown()
del self.pool[socket.name]
except KeyError:
Expand Down Expand Up @@ -322,6 +382,7 @@ def select(sockets, remain=conf.recv_poll_rate):
:returns: an array of sockets that were selected and
the function to be called next to get the packets (i.g. recv)
"""
SocketsPool.multiplex_rx_packets()
ready_sockets = \
[s for s in sockets if isinstance(s, PythonCANSocket) and
len(s.can_iface.rx_queue)]
Expand All @@ -333,7 +394,6 @@ def select(sockets, remain=conf.recv_poll_rate):
# yield this thread to avoid starvation
time.sleep(0)

SocketsPool.multiplex_rx_packets()
return cast(List[SuperSocket], ready_sockets)

def close(self):
Expand Down
93 changes: 78 additions & 15 deletions scapy/contrib/isotp/isotp_soft_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ def __init__(self,
def close(self):
# type: () -> None
if not self.closed:
self.impl.close()
if hasattr(self, "impl"):
self.impl.close()
self.closed = True

def failure_analysis(self):
Expand Down Expand Up @@ -202,8 +203,8 @@ def recv(self, x=0xffff, **kwargs):
return msg

@staticmethod
def select(sockets, remain=None):
# type: (List[SuperSocket], Optional[float]) -> List[SuperSocket]
def select(sockets, remain=None): # type: ignore[override]
# type: (List[Union[SuperSocket, ObjectPipe[Any]]], Optional[float]) -> List[Union[SuperSocket, ObjectPipe[Any]]] # noqa: E501
"""This function is called during sendrecv() routine to wait for
sockets to be ready to receive
"""
Expand All @@ -214,8 +215,12 @@ def select(sockets, remain=None):

ready_pipes = select_objects(obj_pipes, remain)

return [x for x in sockets if isinstance(x, ISOTPSoftSocket) and
not x.closed and x.impl.rx_queue in ready_pipes]
result: List[Union[SuperSocket, ObjectPipe[Any]]] = [
x for x in sockets if isinstance(x, ISOTPSoftSocket) and
not x.closed and x.impl.rx_queue in ready_pipes]
result += [x for x in sockets if isinstance(x, ObjectPipe) and
x in ready_pipes]
return result


class TimeoutScheduler:
Expand Down Expand Up @@ -251,6 +256,7 @@ def schedule(cls, timeout, callback):
# Start the scheduling thread if it is not started already
if cls._thread is None:
t = Thread(target=cls._task, name="TimeoutScheduler._task")
t.daemon = True
must_interrupt = False
cls._thread = t
cls._event.clear()
Expand Down Expand Up @@ -550,6 +556,7 @@ def __init__(self,
self.tx_handle = TimeoutScheduler.schedule(
self.rx_tx_poll_rate, self._send)
self.last_rx_call = 0.0
self.rx_start_time = 0.0

def failure_analysis(self):
# type: () -> None
Expand Down Expand Up @@ -592,12 +599,26 @@ def _get_padding_size(pl_size):
def can_recv(self):
# type: () -> None
self.last_rx_call = TimeoutScheduler._time()
if self.can_socket.select([self.can_socket], 0):
pkt = self.can_socket.recv()
if pkt:
self.on_can_recv(pkt)
try:
while self.can_socket.select([self.can_socket], 0):
pkt = self.can_socket.recv()
if pkt:
self.on_can_recv(pkt)
else:
break
except Exception:
if not self.closed:
log_isotp.warning("Error in can_recv: %s",
traceback.format_exc())
if not self.closed and not self.can_socket.closed:
if self.can_socket.select([self.can_socket], 0):
# Determine poll_time from ISOTP state only.
# Avoid calling select() here — on slow serial interfaces
# (slcan), each select() triggers a mux() call that reads
# N frames at ~2.5ms each, wasting time that could be spent
# processing frames already in the rx_queue.
if self.rx_state == ISOTP_WAIT_DATA or \
self.tx_state == ISOTP_WAIT_FC or \
self.tx_state == ISOTP_WAIT_FIRST_FC:
poll_time = 0.0
else:
poll_time = self.rx_tx_poll_rate
Expand Down Expand Up @@ -643,13 +664,46 @@ def close(self):
self.tx_handle.cancel()
except Scapy_Exception:
pass
if self.rx_timeout_handle is not None:
try:
self.rx_timeout_handle.cancel()
except Scapy_Exception:
pass
if self.tx_timeout_handle is not None:
try:
self.tx_timeout_handle.cancel()
except Scapy_Exception:
pass
try:
self.rx_queue.close()
except (OSError, EOFError):
pass
try:
self.tx_queue.close()
except (OSError, EOFError):
pass

def _rx_timer_handler(self):
# type: () -> None
"""Method called every time the rx_timer times out, due to the peer not
sending a consecutive frame within the expected time window"""

if self.closed:
return

if self.rx_state == ISOTP_WAIT_DATA:
# On slow serial interfaces (slcan), the mux reads frames
# from an OS serial buffer that may contain hundreds of
# background CAN frames. Consecutive Frames from the ECU
# are queued behind this backlog and can take several
# seconds to reach the ISOTP state machine. Extend the
# timeout up to 10 × cf_timeout to give the mux enough
# time to drain the backlog.
total_wait = TimeoutScheduler._time() - self.rx_start_time
if total_wait < self.cf_timeout * 10:
self.rx_timeout_handle = TimeoutScheduler.schedule(
self.cf_timeout, self._rx_timer_handler)
return
# we did not get new data frames in time.
# reset rx state
self.rx_state = ISOTP_IDLE
Expand All @@ -662,6 +716,9 @@ def _tx_timer_handler(self):
two situations: either a Flow Control frame was not received in time,
or the Separation Time Min is expired and a new frame must be sent."""

if self.closed:
return

if (self.tx_state == ISOTP_WAIT_FC or
self.tx_state == ISOTP_WAIT_FIRST_FC):
# we did not get any flow control frame in time
Expand Down Expand Up @@ -866,6 +923,7 @@ def _recv_ff(self, data, ts):
# initial setup for this pdu reception
self.rx_sn = 1
self.rx_state = ISOTP_WAIT_DATA
self.rx_start_time = TimeoutScheduler._time()

# no creation of flow control frames
if not self.listen_only:
Expand Down Expand Up @@ -994,11 +1052,16 @@ def begin_send(self, x):

def _send(self):
# type: () -> None
if self.tx_state == ISOTP_IDLE:
if select_objects([self.tx_queue], 0):
pkt = self.tx_queue.recv()
if pkt:
self.begin_send(pkt)
try:
if self.tx_state == ISOTP_IDLE:
if select_objects([self.tx_queue], 0):
pkt = self.tx_queue.recv()
if pkt:
self.begin_send(pkt)
except Exception:
if not self.closed:
log_isotp.warning("Error in _send: %s",
traceback.format_exc())

if not self.closed:
self.tx_handle = TimeoutScheduler.schedule(
Expand Down
Loading
Loading