Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions scapy/contrib/isotp/isotp_soft_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ def __init__(self,
def close(self):
# type: () -> None
if not self.closed:
self.impl.close()
if hasattr(self, "impl"):
self.impl.close()
self.closed = True

def failure_analysis(self):
Expand Down Expand Up @@ -202,8 +203,8 @@ def recv(self, x=0xffff, **kwargs):
return msg

@staticmethod
def select(sockets, remain=None):
# type: (List[SuperSocket], Optional[float]) -> List[SuperSocket]
def select(sockets, remain=None): # type: ignore[override]
# type: (List[Union[SuperSocket, ObjectPipe[Any]]], Optional[float]) -> List[Union[SuperSocket, ObjectPipe[Any]]] # noqa: E501
"""This function is called during sendrecv() routine to wait for
sockets to be ready to receive
"""
Expand All @@ -214,8 +215,12 @@ def select(sockets, remain=None):

ready_pipes = select_objects(obj_pipes, remain)

return [x for x in sockets if isinstance(x, ISOTPSoftSocket) and
not x.closed and x.impl.rx_queue in ready_pipes]
result: List[Union[SuperSocket, ObjectPipe[Any]]] = [
x for x in sockets if isinstance(x, ISOTPSoftSocket) and
not x.closed and x.impl.rx_queue in ready_pipes]
result += [x for x in sockets if isinstance(x, ObjectPipe) and
x in ready_pipes]
return result


class TimeoutScheduler:
Expand Down
96 changes: 96 additions & 0 deletions test/contrib/isotp_soft_socket.uts
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,102 @@ with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as s

assert rx2 is None

= ISOTPSoftSocket select returns control ObjectPipe

from scapy.automaton import ObjectPipe as _ObjectPipe

close_pipe = _ObjectPipe("control_socket")
close_pipe.send(None)

with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x123, 0x321) as sock:
result = ISOTPSoftSocket.select([sock, close_pipe], remain=0)

assert close_pipe in result

close_pipe.close()

= ISOTPSoftSocket select returns control ObjectPipe alongside ready rx_queue

from scapy.automaton import ObjectPipe as _ObjectPipe

close_pipe = _ObjectPipe("control_socket")
close_pipe.send(None)

with TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, 0x641, 0x241) as sock:
sock.impl.rx_queue.send((b'\x62\xF1\x90\x41\x42\x43', 0.0))
result = ISOTPSoftSocket.select([sock, close_pipe], remain=0)

assert close_pipe in result
assert sock in result

close_pipe.close()

= ISOTPSoftSocket sr1 SF request with MF response threaded

from threading import Thread

request = ISOTP(b'\x22\xF1\x90')
response_data = b'\x62\xF1\x90' + b'\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4A\x4B\x4C\x4D\x4E\x4F\x50'
response_msg = ISOTP(response_data)

with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x641, 0x241) as sock_tx, \
TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x241, 0x641) as sock_rx:
isocan_rx.pair(isocan_tx)
def responder():
sniffed = sock_rx.sniff(count=1, timeout=5)
if sniffed:
sock_rx.send(response_msg)
resp_thread = Thread(target=responder, daemon=True)
resp_thread.start()
time.sleep(0.1)
rx = sock_tx.sr1(request, timeout=5, verbose=False, threaded=True)
resp_thread.join(timeout=3)

assert rx is not None
assert rx.data == response_data

= ISOTPSoftSocket sr1 timeout with threaded=True

from threading import Thread, Event
msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')

with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \
TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx:
isocan_rx.pair(isocan_tx)
start = time.time()
rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True)
elapsed = time.time() - start

assert rx2 is None
assert elapsed < 5

= ISOTPSoftSocket sr1 timeout with threaded=True and background traffic

from threading import Thread, Event
msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')

with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \
TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx:
isocan_rx.pair(isocan_tx)
stop_traffic = Event()
def bg_traffic():
while not stop_traffic.is_set():
try:
isocan_rx.send(CAN(identifier=0x456, data=dhex("01 02 03")))
except Exception:
break
time.sleep(0.01)
traffic_thread = Thread(target=bg_traffic, daemon=True)
traffic_thread.start()
start = time.time()
rx2 = sock_tx.sr1(msg, timeout=3, verbose=False, threaded=True)
elapsed = time.time() - start
stop_traffic.set()
traffic_thread.join(timeout=2)

assert rx2 is None
assert elapsed < 5

= ISOTPSoftSocket sniff

msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')
Expand Down
Loading