diff --git a/scapy/contrib/j1939.py b/scapy/contrib/j1939.py new file mode 100644 index 00000000000..6cba894241d --- /dev/null +++ b/scapy/contrib/j1939.py @@ -0,0 +1,780 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information +# Copyright (C) 2024 Scapy contributors + +# scapy.contrib.description = SAE J1939 Vehicle Network Protocol +# scapy.contrib.status = loads + +""" +SAE J1939 - Vehicle network protocol for heavy-duty vehicles. + +J1939 uses 29-bit extended CAN identifiers to encode a structured addressing +scheme. The 29-bit identifier is partitioned as follows:: + + Bits 28-26 : Priority (3 bits, 0 = highest) + Bit 25 : Reserved (1 bit) + Bit 24 : Data Page (1 bit) + Bits 23-16 : PDU Format (8 bits, PF) + Bits 15-8 : PDU Specific (8 bits, PS) + PF < 240 → Destination Address (PDU1, peer-to-peer) + PF ≥ 240 → Group Extension (PDU2, broadcast) + Bits 7-0 : Source Address (8 bits, SA) + +PGN (Parameter Group Number): + PDU1 (PF < 240): PGN = (DP << 16) | (PF << 8) — PS is DA + PDU2 (PF ≥ 240): PGN = (DP << 16) | (PF << 8) | GE — broadcast only + +References: + - SAE J1939 standard + - Linux kernel J1939 documentation: + https://www.kernel.org/doc/html/latest/networking/j1939.html +""" + +import socket +import struct +import logging +import time + +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, + Type, +) + +from scapy.config import conf +from scapy.data import SO_TIMESTAMPNS +from scapy.error import Scapy_Exception, log_runtime +from scapy.fields import ( + BitField, + ByteField, + FieldLenField, + FlagsField, + LEShortField, + ShortField, + StrField, + StrFixedLenField, + StrLenField, + ThreeBytesField, + XLE3BytesField, +) +from scapy.layers.can import CAN +from scapy.packet import Packet +from scapy.supersocket import SuperSocket +from scapy.compat import raw + +log_j1939 = logging.getLogger("scapy.contrib.j1939") + +# --------------------------------------------------------------------------- +# J1939 constants (sourced from Python socket module where available) +# socket.CAN_J1939 and related constants were added in Python 3.9. +# Fallback values are taken from the Linux kernel header linux/can/j1939.h. +# --------------------------------------------------------------------------- + +#: 64-bit NAME not used / not relevant +J1939_NO_NAME = getattr(socket, 'J1939_NO_NAME', 0) # 0 +#: PGN wildcard – match any PGN when used in bind / filter +J1939_NO_PGN = getattr(socket, 'J1939_NO_PGN', 0x40000000) # 0x40000000 +#: Address wildcard – no specific address +J1939_NO_ADDR = getattr(socket, 'J1939_NO_ADDR', 0xFF) # 0xFF +#: Idle/null address (used during address claiming) +J1939_IDLE_ADDR = getattr(socket, 'J1939_IDLE_ADDR', 0xFE) # 0xFE +#: Maximum normal (unicast) address value +J1939_MAX_UNICAST_ADDR = getattr(socket, 'J1939_MAX_UNICAST_ADDR', 0xFD) # 0xFD +#: Global broadcast address +J1939_BROADCAST_ADDR = J1939_NO_ADDR # 0xFF + +# PGN constants +J1939_PGN_REQUEST = getattr(socket, 'J1939_PGN_REQUEST', 0xEA00) # 0xEA00 +J1939_PGN_ADDRESS_CLAIMED = getattr( # 0xEE00 + socket, 'J1939_PGN_ADDRESS_CLAIMED', 0xEE00) +J1939_PGN_ADDRESS_COMMANDED = getattr( # 0xFED8 + socket, 'J1939_PGN_ADDRESS_COMMANDED', 0xFED8) +J1939_PGN_MAX = getattr(socket, 'J1939_PGN_MAX', 0x3FFFF) # 0x3FFFF +J1939_PGN_PDU1_MAX = getattr(socket, 'J1939_PGN_PDU1_MAX', 0x3FF00) # 0x3FF00 +#: Transport Protocol – Connection Management +J1939_PGN_TP_CM = 0xEC00 +#: Transport Protocol – Data Transfer +J1939_PGN_TP_DT = 0xEB00 + +# TP control byte values (integer constants; the classes share the prefix name) +J1939_TP_CTRL_RTS = 16 # Request To Send +J1939_TP_CTRL_CTS = 17 # Clear To Send +J1939_TP_CTRL_ACK = 19 # End of Message Acknowledge +J1939_TP_CTRL_BAM = 32 # Broadcast Announce Message +J1939_TP_CTRL_ABORT = 255 # Connection Abort + +# PDU format threshold: PF < 240 → PDU1 (peer-to-peer), PF ≥ 240 → PDU2 (broadcast) +J1939_PDU1_MAX_PF = 239 + +# Socket-level constants (linux/can/j1939.h; fallbacks for Python < 3.9) +CAN_J1939 = getattr(socket, 'CAN_J1939', 7) # 7 +SOL_CAN_BASE = 100 +SOL_CAN_J1939 = SOL_CAN_BASE + CAN_J1939 # 107 +SO_J1939_FILTER = getattr(socket, 'SO_J1939_FILTER', 1) # 1 +SO_J1939_PROMISC = getattr(socket, 'SO_J1939_PROMISC', 2) # 2 +SO_J1939_SEND_PRIO = getattr(socket, 'SO_J1939_SEND_PRIO', 3) # 3 +SO_J1939_ERRQUEUE = getattr(socket, 'SO_J1939_ERRQUEUE', 4) # 4 +SCM_J1939_DEST_ADDR = getattr(socket, 'SCM_J1939_DEST_ADDR', 1) # 1 +SCM_J1939_DEST_NAME = getattr(socket, 'SCM_J1939_DEST_NAME', 2) # 2 +SCM_J1939_PRIO = getattr(socket, 'SCM_J1939_PRIO', 3) # 3 +SCM_J1939_ERRQUEUE = getattr(socket, 'SCM_J1939_ERRQUEUE', 4) # 4 + +# Default configuration key +conf.contribs['J1939'] = {'channel': 'can0'} + +# Common source address names (informational) +J1939_ADDR_NAMES = { + 0x00: "Engine #1", + 0x01: "Engine #2", + 0x02: "Turbocharger", + 0x03: "Transmission #1", + 0x04: "Transmission #2", + 0x0B: "Brakes - System Controller", + 0x0F: "Instrument Cluster #1", + 0x11: "Trip Recorder", + 0x15: "Retarder, Exhaust, Engine #1", + 0x17: "Cruise Control", + 0x21: "Transmission, Automatic #1", + 0x27: "Clutch/Converter Unit", + 0x28: "Auxiliary Valve Control", + 0x29: "Auxiliary Valve Control #2", + 0xEF: "Null/Reserved", + 0xFE: "NULL (no address)", + 0xFF: "Global (broadcast)", +} + + +# --------------------------------------------------------------------------- +# Helper functions +# --------------------------------------------------------------------------- + +def pgn_is_pdu1(pgn): + # type: (int) -> bool + """Return True if *pgn* is a PDU1 (peer-to-peer) Parameter Group Number.""" + return ((pgn >> 8) & 0xFF) <= J1939_PDU1_MAX_PF + + +def can_id_to_j1939(can_id): + # type: (int) -> Dict[str, int] + """Decode a 29-bit CAN identifier to a dictionary of J1939 fields. + + :param can_id: 29-bit extended CAN identifier + :returns: dict with keys ``priority``, ``reserved``, ``data_page``, + ``pdu_format``, ``pdu_specific``, ``src`` + """ + return { + 'priority': (can_id >> 26) & 0x7, + 'reserved': (can_id >> 25) & 0x1, + 'data_page': (can_id >> 24) & 0x1, + 'pdu_format': (can_id >> 16) & 0xFF, + 'pdu_specific': (can_id >> 8) & 0xFF, + 'src': can_id & 0xFF, + } + + +def j1939_to_can_id(priority, reserved, data_page, pdu_format, pdu_specific, src): + # type: (int, int, int, int, int, int) -> int + """Encode J1939 fields into a 29-bit CAN identifier. + + :returns: 29-bit CAN identifier value + """ + return ( + (priority & 0x7) << 26 | + (reserved & 0x1) << 25 | + (data_page & 0x1) << 24 | + (pdu_format & 0xFF) << 16 | + (pdu_specific & 0xFF) << 8 | + (src & 0xFF) + ) + + +def pgn_from_fields(data_page, pdu_format, pdu_specific): + # type: (int, int, int) -> int + """Compute the PGN from J1939 CAN identifier sub-fields. + + :param data_page: data page bit (0 or 1) + :param pdu_format: PDU format byte (0-255) + :param pdu_specific: PDU specific byte (0-255) + :returns: 18-bit PGN value + """ + if pdu_format <= J1939_PDU1_MAX_PF: + # PDU1: PS is destination address – not included in PGN + return (data_page << 16) | (pdu_format << 8) + else: + # PDU2: PS is group extension – included in PGN + return (data_page << 16) | (pdu_format << 8) | pdu_specific + + +def dst_from_fields(pdu_format, pdu_specific): + # type: (int, int) -> int + """Return the destination address encoded in J1939 identifier fields. + + :param pdu_format: PDU format byte (0-255) + :param pdu_specific: PDU specific byte (0-255) + :returns: destination address (0x00-0xFF), or ``J1939_NO_ADDR`` for PDU2 + """ + if pdu_format <= J1939_PDU1_MAX_PF: + return pdu_specific + return J1939_NO_ADDR + + +# --------------------------------------------------------------------------- +# J1939 application-layer packet +# --------------------------------------------------------------------------- + +class J1939(Packet): + """SAE J1939 application-layer message. + + This class represents a J1939 message at the application layer. When used + with :class:`NativeJ1939Socket`, the Linux kernel J1939 stack handles + transport-protocol framing (segmentation / reassembly) automatically, so + ``data`` may be larger than 8 bytes. + + Addressing information – ``priority``, ``pgn``, ``src``, ``dst`` – is + stored in :attr:`__slots__` rather than as wire fields (the same approach + used by :class:`~scapy.contrib.isotp.ISOTP`). These attributes are + populated by :class:`NativeJ1939Socket` upon reception. + + Example:: + + >>> msg = J1939(b'\\x01\\x02\\x03', pgn=0xFECA, src=0x00, dst=0xFF) + >>> msg.pgn + 65226 + >>> msg.src + 0 + """ + + name = 'J1939' + fields_desc = [ + StrField('data', b'') + ] + __slots__ = Packet.__slots__ + ['priority', 'pgn', 'src', 'dst'] + + def __init__(self, *args, **kwargs): + # type: (*Any, **Any) -> None + self.priority = kwargs.pop('priority', 6) # type: int + self.pgn = kwargs.pop('pgn', 0) # type: int + self.src = kwargs.pop('src', J1939_NO_ADDR) # type: int + self.dst = kwargs.pop('dst', J1939_NO_ADDR) # type: int + Packet.__init__(self, *args, **kwargs) + + def answers(self, other): + # type: (Packet) -> int + if not isinstance(other, J1939): + return 0 + return self.data == other.data + + def mysummary(self): + # type: () -> str + # Addressing is in __slots__, not wire fields, so build the summary directly. + return "J1939 PGN=0x%05X SA=0x%02X DA=0x%02X prio=%d" % ( + self.pgn, self.src, self.dst, self.priority + ) + + +# --------------------------------------------------------------------------- +# J1939 CAN-frame-level packet +# --------------------------------------------------------------------------- + +class J1939_CAN(CAN): + """J1939 CAN frame – the 29-bit extended CAN identifier decoded as J1939. + + Inherits from :class:`~scapy.layers.can.CAN` so that all CAN lifecycle + methods are reused automatically: + + * ``pre_dissect`` / ``post_build`` – byte-order swap controlled by + ``conf.contribs['CAN']['swap-bytes']`` (Wireshark vs PF_CAN format). + * ``extract_padding`` – padding removal controlled by + ``conf.contribs['CAN']['remove-padding']``. + + The only structural difference from :class:`~scapy.layers.can.CAN` is + that the 29-bit ``identifier`` field is decomposed into the six J1939 + sub-fields (``priority``, ``reserved``, ``data_page``, ``pdu_format``, + ``pdu_specific``, ``src``), while the wire layout remains **identical**. + + CAN identifier sub-fields:: + + priority (bits 28-26): message priority, 0 = highest, 7 = lowest + reserved (bit 25) : reserved, should be 0 + data_page (bit 24) : selects one of two parameter group tables + pdu_format (bits 23-16): determines PDU type (< 240 → PDU1) + pdu_specific(bits 15-8) : DA if PDU1, Group Extension if PDU2 + src (bits 7-0) : source address + + Convenience properties :attr:`pgn` and :attr:`dst` are derived from the + sub-fields. + + Example:: + + >>> pkt = J1939_CAN(priority=6, pdu_format=0xFE, pdu_specific=0xCA, + ... src=0x00, data=b'\\xff' * 8) + >>> hex(pkt.pgn) + '0xfeca' + >>> hex(pkt.dst) + '0xff' + """ + + name = 'J1939_CAN' + fields_desc = [ + # ── first 32 bits: CAN flags(3) + J1939 identifier fields(29) ────── + FlagsField('flags', 0b100, 3, + ['error', 'remote_transmission_request', 'extended']), + BitField('priority', 6, 3), # J1939 priority + BitField('reserved', 0, 1), # Reserved bit + BitField('data_page', 0, 1), # Data Page (DP) + ByteField('pdu_format', 0xFE), # PDU Format (PF) + ByteField('pdu_specific', 0xFF), # PDU Specific (PS): DA or GE + ByteField('src', 0xFE), # Source Address (SA) + # ── standard CAN data-length + padding ──────────────────────────── + FieldLenField('length', None, length_of='data', fmt='B'), + ThreeBytesField('reserved2', 0), + StrLenField('data', b'', length_from=lambda p: int(p.length)), + ] + + @classmethod + def dispatch_hook(cls, _pkt=None, *args, **kargs): + # type: (Optional[bytes], *Any, **Any) -> Type[Packet] + # Always decode as J1939_CAN; do not redirect to plain CAN or CANFD. + return cls + + @property + def pgn(self): + # type: () -> int + """PGN (Parameter Group Number) derived from ``data_page``, + ``pdu_format``, and ``pdu_specific``.""" + return pgn_from_fields(self.data_page, self.pdu_format, self.pdu_specific) + + @property + def dst(self): + # type: () -> int + """Destination address for PDU1 frames; :data:`J1939_NO_ADDR` for PDU2.""" + return dst_from_fields(self.pdu_format, self.pdu_specific) + + def to_can(self): + # type: () -> CAN + """Convert to a standard :class:`~scapy.layers.can.CAN` packet. + + The wire bytes are identical so this is simply a class change. + """ + return CAN(bytes(self)) + + @classmethod + def from_can(cls, pkt): + # type: (CAN) -> J1939_CAN + """Create a :class:`J1939_CAN` from a :class:`~scapy.layers.can.CAN` packet. + + The wire bytes are identical so this is simply a class change. + The packet timestamp is preserved from *pkt*. + """ + result = cls(bytes(pkt)) + result.time = pkt.time + return result + + def mysummary(self): + # type: () -> str + return "J1939_CAN PGN=0x%05X SA=0x%02X" % (self.pgn, self.src) + + +# --------------------------------------------------------------------------- +# J1939 Transport Protocol (TP) frames +# --------------------------------------------------------------------------- +# TP allows up to 1785 bytes per multi-packet session using PGN 0xEC00 +# (TP.CM – Connection Management) and PGN 0xEB00 (TP.DT – Data Transfer). + +_TP_CM_CTRL_NAMES = { + J1939_TP_CTRL_RTS: 'RTS', + J1939_TP_CTRL_CTS: 'CTS', + J1939_TP_CTRL_ACK: 'EOM_ACK', + J1939_TP_CTRL_BAM: 'BAM', + J1939_TP_CTRL_ABORT: 'ABORT', +} + +_TP_ABORT_REASON = { + 1: 'Already in connection', + 2: 'System resources', + 3: 'Timeout', + 4: 'CTS while DT in progress', + 5: 'Max retransmit exceeded', + 6: 'Unexpected DT packet', + 7: 'Bad sequence number', + 8: 'Duplicate sequence number', + 250: 'Other', + 251: 'Other', + 252: 'Other', + 253: 'Other', + 254: 'Other', + 255: 'Other', +} + + +class J1939_TP_CM_RTS(Packet): + """J1939 TP Connection Management – Request To Send (RTS). + + Sent before a peer-to-peer multi-packet message to announce the total + size and packet count. Uses PGN 0xEC00. + """ + name = 'J1939_TP_CM_RTS' + fields_desc = [ + ByteField('ctrl', J1939_TP_CTRL_RTS), # 16 + LEShortField('total_size', 0), # total message size (bytes) + ByteField('num_packets', 0), # total number of TP.DT packets + ByteField('max_packets', 0xFF), # max packets per CTS (0xFF = no limit) + XLE3BytesField('pgn', 0), # PGN of the message being transferred + ] + + +class J1939_TP_CM_CTS(Packet): + """J1939 TP Connection Management – Clear To Send (CTS). + + Response to :class:`J1939_TP_CM_RTS`; authorises the sender to transmit + up to *num_packets* TP.DT packets starting from *next_packet*. + """ + name = 'J1939_TP_CM_CTS' + fields_desc = [ + ByteField('ctrl', J1939_TP_CTRL_CTS), # 17 + ByteField('num_packets', 0), # number of packets to send now + ByteField('next_packet', 1), # next expected sequence number + ShortField('reserved', 0xFFFF), + XLE3BytesField('pgn', 0), # PGN of the message + ] + + +class J1939_TP_CM_ACK(Packet): + """J1939 TP Connection Management – End of Message Acknowledge (EoMAck). + + Sent by the receiver after all TP.DT packets have been received. + """ + name = 'J1939_TP_CM_ACK' + fields_desc = [ + ByteField('ctrl', J1939_TP_CTRL_ACK), # 19 + LEShortField('total_size', 0), # total message size + ByteField('num_packets', 0), # total TP.DT packets received + ByteField('reserved', 0xFF), + XLE3BytesField('pgn', 0), # PGN of the message + ] + + +class J1939_TP_CM_BAM(Packet): + """J1939 TP Connection Management – Broadcast Announce Message (BAM). + + Announces a forthcoming multi-packet broadcast; no CTS handshake is used. + """ + name = 'J1939_TP_CM_BAM' + fields_desc = [ + ByteField('ctrl', J1939_TP_CTRL_BAM), # 32 + LEShortField('total_size', 0), # total message size (bytes) + ByteField('num_packets', 0), # total number of TP.DT packets + ByteField('reserved', 0xFF), + XLE3BytesField('pgn', 0), # PGN of the message + ] + + +class J1939_TP_CM_ABORT(Packet): + """J1939 TP Connection Management – Connection Abort.""" + name = 'J1939_TP_CM_ABORT' + fields_desc = [ + ByteField('ctrl', J1939_TP_CTRL_ABORT), # 255 + ByteField('reason', 0), # abort reason + ShortField('reserved', 0xFFFF), + ByteField('reserved2', 0xFF), + XLE3BytesField('pgn', 0), # PGN of the aborted message + ] + + +class J1939_TP_CM(Packet): + """J1939 TP Connection Management frame dispatcher. + + Parses a raw 8-byte TP.CM payload and returns the appropriate sub-class. + + Example:: + + >>> J1939_TP_CM(bytes([32, 20, 0, 3, 0xFF, 0xCA, 0xFE, 0x00])) + + """ + name = 'J1939_TP_CM' + + @classmethod + def dispatch_hook(cls, _pkt=None, *args, **kargs): + # type: (Optional[bytes], *Any, **Any) -> Type[Packet] + if _pkt and len(_pkt) >= 1: + ctrl = _pkt[0] + if ctrl == J1939_TP_CTRL_RTS: + return J1939_TP_CM_RTS + elif ctrl == J1939_TP_CTRL_CTS: + return J1939_TP_CM_CTS + elif ctrl == J1939_TP_CTRL_ACK: + return J1939_TP_CM_ACK + elif ctrl == J1939_TP_CTRL_BAM: + return J1939_TP_CM_BAM + elif ctrl == J1939_TP_CTRL_ABORT: + return J1939_TP_CM_ABORT + return cls + + def do_dissect(self, s): + # type: (bytes) -> bytes + return s + + +class J1939_TP_DT(Packet): + """J1939 TP Data Transfer frame. + + Each TP.DT packet carries up to 7 bytes of payload; the first byte is the + sequence number (1–255). Unused bytes are padded with ``0xFF``. + """ + name = 'J1939_TP_DT' + fields_desc = [ + ByteField('seq_num', 1), # sequence number 1-255 + StrFixedLenField('data', b'\xff' * 7, 7), # 7 data bytes (0xFF = unused) + ] + + +# --------------------------------------------------------------------------- +# NativeJ1939Socket +# --------------------------------------------------------------------------- + +class NativeJ1939Socket(SuperSocket): + """Linux kernel J1939 socket (``PF_CAN / SOCK_DGRAM / CAN_J1939``). + + The kernel J1939 stack handles transport-protocol framing automatically: + messages larger than 8 bytes are fragmented / reassembled transparently, + and the application deals only with complete J1939 messages. + + .. note:: Design – why not inherit from ``NativeCANSocket``? + + :class:`~scapy.contrib.cansocket_native.NativeCANSocket` uses + ``SOCK_RAW / CAN_RAW``, while this class uses + ``SOCK_DGRAM / CAN_J1939``. The socket type, protocol, ``send()`` + logic (``sendto`` with 4-tuple destination vs plain ``send``), + ``recv()`` logic (``recvmsg`` for J1939 ancillary data vs raw bytes + + byte-order swap), and address binding API are all fundamentally + different. Inheriting from ``NativeCANSocket`` would override or + bypass every method, making the hierarchy misleading rather than + helpful. + + :param channel: CAN interface name (default: ``can0``) + :param src_name: 64-bit J1939 NAME of this node (0 = no name) + :param src_addr: Source address to bind to (:data:`J1939_NO_ADDR` for + promiscuous reception of all addresses) + :param pgn: PGN to bind to (:data:`J1939_NO_PGN` for all PGNs) + :param promisc: Enable promiscuous mode – receive all J1939 messages on + the interface regardless of destination address + :param filters: Optional list of ``j1939_filter`` dicts; each may + contain the keys ``name``, ``name_mask``, ``pgn``, + ``pgn_mask``, ``addr``, ``addr_mask`` + :param basecls: Packet class used to wrap received payloads + (default: :class:`J1939`) + + Example – sniff all J1939 traffic on *vcan0*:: + + >>> sock = NativeJ1939Socket("vcan0", promisc=True) + >>> pkt = sock.recv() + >>> print(pkt.pgn, pkt.src, pkt.data) + + Example – send a J1939 message:: + + >>> sock = NativeJ1939Socket("vcan0", src_addr=0x00) + >>> sock.send(J1939(data=b'\\x01\\x02', pgn=0xFECA, dst=0xFF)) + """ + + desc = "read/write J1939 messages using Linux kernel PF_CAN/CAN_J1939 sockets" + + # struct j1939_filter: name(Q) name_mask(Q) pgn(I) pgn_mask(I) addr(B) addr_mask(B) + _J1939_FILTER_FMT = "=QQIIBB" + _J1939_FILTER_PAD = b'\x00' * 4 # 4 bytes padding to reach 28-byte alignment + + def __init__( + self, + channel=None, # type: Optional[str] + src_name=J1939_NO_NAME, # type: int + src_addr=J1939_NO_ADDR, # type: int + pgn=J1939_NO_PGN, # type: int + promisc=True, # type: bool + filters=None, # type: Optional[List[Dict[str, int]]] + basecls=J1939, # type: Type[Packet] + **kwargs # type: Any + ): + # type: (...) -> None + self.channel = channel or conf.contribs['J1939']['channel'] + self.src_name = src_name + self.src_addr = src_addr + self.pgn = pgn + self.basecls = basecls + + self.ins = socket.socket( + socket.PF_CAN, socket.SOCK_DGRAM, CAN_J1939 + ) + + if promisc: + try: + self.ins.setsockopt( + SOL_CAN_J1939, + SO_J1939_PROMISC, + struct.pack('i', 1), + ) + except OSError as exc: + raise Scapy_Exception( + "Could not enable J1939 promiscuous mode: %s" % exc + ) + + # Enable ancillary data so we can read destination address and priority + try: + self.ins.setsockopt(socket.SOL_SOCKET, SO_TIMESTAMPNS, 1) + self.auxdata_available = True + except OSError: + self.auxdata_available = False + log_runtime.info("SO_TIMESTAMPNS not supported on this kernel") + + if filters is not None: + self._set_filters(filters) + + self.ins.bind((self.channel, src_name, pgn, src_addr)) + self.outs = self.ins + + def _set_filters(self, filters): + # type: (List[Dict[str, int]]) -> None + """Apply a list of J1939 filters to the socket. + + Each filter dict may contain any of: + ``name``, ``name_mask``, ``pgn``, ``pgn_mask``, ``addr``, ``addr_mask``. + """ + packed = b'' + for f in filters: + packed += struct.pack( + self._J1939_FILTER_FMT, + f.get('name', J1939_NO_NAME), + f.get('name_mask', J1939_NO_NAME), + f.get('pgn', J1939_NO_PGN), + f.get('pgn_mask', J1939_NO_PGN), + f.get('addr', J1939_NO_ADDR), + f.get('addr_mask', J1939_NO_ADDR), + ) + self._J1939_FILTER_PAD + try: + self.ins.setsockopt(SOL_CAN_J1939, SO_J1939_FILTER, packed) + except OSError as exc: + raise Scapy_Exception( + "Could not set J1939 filter: %s" % exc + ) + + def recv_raw(self, x=4096): + # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] + """Returns a tuple ``(cls, pkt_data, timestamp)``. + + .. note:: + The returned *pkt_data* is only the raw J1939 payload bytes. + Addressing metadata (PGN, source/destination address, priority) is + unavailable through this low-level interface; use :meth:`recv` + instead to obtain a fully populated :class:`J1939` packet. + """ + try: + pkt_data = self.ins.recv(x) + except BlockingIOError: + log_j1939.warning('Captured no data, socket in non-blocking mode.') + return None, None, None + except socket.timeout: + log_j1939.warning('Captured no data, socket read timed out.') + return None, None, None + except OSError as exc: + log_j1939.warning('Captured no data: %s', exc) + return None, None, None + + return self.basecls, pkt_data, None + + def recv(self, x=4096, **kwargs): + # type: (int, **Any) -> Optional[Packet] + """Receive one J1939 message, including addressing metadata. + + Returns a :attr:`basecls` instance (default: :class:`J1939`) with + ``priority``, ``pgn``, ``src``, and ``dst`` populated from the kernel. + """ + try: + data, ancdata, _flags, addr = self.ins.recvmsg(x, 256) + except BlockingIOError: + log_j1939.warning('Captured no data, socket in non-blocking mode.') + return None + except socket.timeout: + log_j1939.warning('Captured no data, socket read timed out.') + return None + except OSError as exc: + log_j1939.warning('Captured no data: %s', exc) + return None + + # addr = (iface_name, name, pgn, src_addr) + _iface, _src_name, src_pgn, src_addr = addr + + dst_addr = J1939_NO_ADDR + priority = 6 + ts = None + + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_type == SCM_J1939_DEST_ADDR: + if cmsg_data: + dst_addr = struct.unpack('B', cmsg_data[:1])[0] + elif cmsg_type == SCM_J1939_PRIO: + if cmsg_data: + priority = struct.unpack('B', cmsg_data[:1])[0] + + if ts is None: + ts = time.time() + + try: + pkt = self.basecls( + data, + priority=priority, + pgn=src_pgn, + src=src_addr, + dst=dst_addr, + ) + except Exception: + pkt = self.basecls(data) + + pkt.time = ts + return pkt + + def send(self, x): + # type: (Packet) -> int + """Send a J1939 message. + + If *x* is a :class:`J1939` packet, the ``pgn``, ``dst``, and + ``priority`` attributes are used. For other packet types the raw bytes + are sent to the socket's default destination. + """ + if x is None: + return 0 + + try: + x.sent_time = time.time() + except AttributeError: + pass + + # Extract payload bytes + if isinstance(x, J1939): + data = x.data if isinstance(x.data, bytes) else raw(x) + dst_pgn = x.pgn if x.pgn != 0 else J1939_NO_PGN + dst_addr = x.dst + priority = x.priority + else: + data = raw(x) + dst_pgn = J1939_NO_PGN + dst_addr = J1939_NO_ADDR + priority = 6 + + # Set per-message priority + try: + self.outs.setsockopt( + SOL_CAN_J1939, + SO_J1939_SEND_PRIO, + struct.pack('i', priority), + ) + except OSError: + pass # not critical + + dst = (self.channel, J1939_NO_NAME, dst_pgn, dst_addr) + try: + return self.outs.sendto(data, dst) + except OSError as exc: + log_j1939.error("Failed to send J1939 packet: %s", exc) + return 0 diff --git a/test/contrib/j1939.uts b/test/contrib/j1939.uts new file mode 100644 index 00000000000..ff83df0d520 --- /dev/null +++ b/test/contrib/j1939.uts @@ -0,0 +1,964 @@ +% Regression tests for SAE J1939 protocol +~ not_pypy linux + +# More information at http://www.secdev.org/projects/UTscapy/ + +############ +############ ++ Setup +~ conf + += Load J1939 module + +load_contrib("j1939", globals_dict=globals()) +from scapy.contrib.j1939 import ( + J1939, J1939_CAN, + J1939_TP_CM, J1939_TP_CM_RTS, J1939_TP_CM_CTS, J1939_TP_CM_ACK, + J1939_TP_CM_BAM, J1939_TP_CM_ABORT, J1939_TP_DT, + J1939_NO_ADDR, J1939_NO_PGN, J1939_NO_NAME, + J1939_IDLE_ADDR, J1939_BROADCAST_ADDR, + J1939_PGN_TP_CM, J1939_PGN_TP_DT, + J1939_TP_CTRL_RTS, J1939_TP_CTRL_CTS, J1939_TP_CTRL_ACK, + J1939_TP_CTRL_BAM, J1939_TP_CTRL_ABORT, + can_id_to_j1939, j1939_to_can_id, pgn_from_fields, dst_from_fields, + pgn_is_pdu1, +) + += hexadecimal helper + +dhex = bytes.fromhex + + +############ +############ ++ Helper function tests + += pgn_is_pdu1 with PDU1 PGN (PF < 240) +assert pgn_is_pdu1(0xEC00) == True +assert pgn_is_pdu1(0xEA00) == True +assert pgn_is_pdu1(0x0100) == True + += pgn_is_pdu1 with PDU2 PGN (PF >= 240) +assert pgn_is_pdu1(0xFECA) == False +assert pgn_is_pdu1(0xFF00) == False +assert pgn_is_pdu1(0xF004) == False + += can_id_to_j1939 – typical broadcast frame (CAN ID 0x18FECA00) +fields = can_id_to_j1939(0x18FECA00 & 0x1FFFFFFF) +# 0x18FECA00 as 29-bit id: 0x0CFECA00 >> ... let's compute properly +# CAN ID bits [28:0] = 0x18FECA00 & 0x1FFFFFFF = 0x18FECA00 & 0x1FFFFFFF +# priority = bits 28-26 of 0x18FECA00 = (0x18FECA00 >> 26) & 7 = 6 +# reserved = bit 25 = 0 +# dp = bit 24 = 1? let's recalculate +# 0x18FECA00 = 0b0001_1000_1111_1110_1100_1010_0000_0000 +# bits 28-26 = 0b110 = 6 +# bit 25 = 0 +# bit 24 = 0 (actually let's check: 0x18 = 0001_1000, bits 28-24 = 0b1_1000 = 24 = priority=6 r=0 dp=0) +# So for 0x18FECA00: priority=6, r=0, dp=0, pf=0xFE, ps=0xCA, sa=0x00 +can_id_29bit = 0x18FECA00 & 0x1FFFFFFF +fields = can_id_to_j1939(can_id_29bit) +assert fields['priority'] == 6 +assert fields['reserved'] == 0 +assert fields['data_page'] == 0 +assert fields['pdu_format'] == 0xFE +assert fields['pdu_specific'] == 0xCA +assert fields['src'] == 0x00 + += j1939_to_can_id round-trip +can_id = j1939_to_can_id(6, 0, 0, 0xFE, 0xCA, 0x00) +assert can_id == (0x18FECA00 & 0x1FFFFFFF) + += can_id_to_j1939 / j1939_to_can_id round-trip +for priority in [0, 3, 6, 7]: + for dp in [0, 1]: + for pf in [0x00, 0xEA, 0xF0, 0xFE, 0xFF]: + for ps in [0x00, 0xFF]: + for sa in [0x00, 0x0B, 0xFE]: + cid = j1939_to_can_id(priority, 0, dp, pf, ps, sa) + fields = can_id_to_j1939(cid) + assert fields['priority'] == priority + assert fields['data_page'] == dp + assert fields['pdu_format'] == pf + assert fields['pdu_specific'] == ps + assert fields['src'] == sa + += pgn_from_fields – PDU2 frame (PF >= 240) +# PF=0xFE (254), PS=0xCA → PGN includes PS +pgn = pgn_from_fields(0, 0xFE, 0xCA) +assert pgn == 0xFECA + += pgn_from_fields – PDU2 frame with data_page=1 +pgn = pgn_from_fields(1, 0xFE, 0xCA) +assert pgn == 0x1FECA + += pgn_from_fields – PDU1 frame (PF < 240) +# PF=0xEC (236), PS=0x00 (destination, not part of PGN) +pgn = pgn_from_fields(0, 0xEC, 0x00) +assert pgn == 0xEC00 + += pgn_from_fields – PDU1 PGN does not include PS +pgn1 = pgn_from_fields(0, 0xEB, 0x00) +pgn2 = pgn_from_fields(0, 0xEB, 0xFF) +assert pgn1 == pgn2, "PDU1 PGN must not depend on pdu_specific" + += dst_from_fields – PDU1 yields destination address +dst = dst_from_fields(0xEC, 0x42) +assert dst == 0x42 + += dst_from_fields – PDU2 yields broadcast +dst = dst_from_fields(0xFE, 0xCA) +assert dst == J1939_NO_ADDR # 0xFF + += dst_from_fields – PDU1/PDU2 boundary: PF=0xEF (239) is still PDU1 +# PF=0xEF is the last PDU1 value (< 240); pdu_specific is the destination address +dst = dst_from_fields(0xEF, 0x42) +assert dst == 0x42, "PF=0xEF (239) must be PDU1 – PS is destination address" + += dst_from_fields – PDU1/PDU2 boundary: PF=0xF0 (240) is the first PDU2 value +# PF=0xF0 is the first PDU2 value (>= 240); no peer-to-peer destination +dst = dst_from_fields(0xF0, 0x04) +assert dst == J1939_NO_ADDR, "PF=0xF0 (240) must be PDU2 – broadcast only" + + +############ +############ ++ j1939_to_can_id / can_id_to_j1939 boundary values +# Inspired by TruckDevil test_j1939_fields_to_can_id_param + += j1939_to_can_id – all-zeros produces CAN ID 0 +can_id = j1939_to_can_id(0, 0, 0, 0, 0, 0) +assert can_id == 0, "All-zero fields must produce CAN ID 0" + += j1939_to_can_id – all-max produces 29-bit all-ones (0x1FFFFFFF) +can_id = j1939_to_can_id(7, 1, 1, 0xFF, 0xFF, 0xFF) +assert can_id == 0x1FFFFFFF, "Max fields must produce 0x1FFFFFFF" + += j1939_to_can_id – TP.CM frame (priority=3, PF=0xEC, PS=0x00, SA=0x0B → 0x0CEC000B) +# Inspired by TruckDevil: j1939_fields_to_can_id(3, 0, 0, 0xEC, 0x00, 0x0B) == 0x0CEC000B +can_id = j1939_to_can_id(3, 0, 0, 0xEC, 0x00, 0x0B) +assert can_id == 0x0CEC000B, "TP.CM PDU1 frame CAN ID mismatch: got 0x%08X" % can_id +fields = can_id_to_j1939(can_id) +assert fields['priority'] == 3 +assert fields['pdu_format'] == 0xEC +assert fields['pdu_specific'] == 0x00 +assert fields['src'] == 0x0B + += j1939_to_can_id – Request PGN (PGN 0xEA00, SA=0xFF → 0x18EA00FF) +# Standard J1939 Request frame; sender is SA=0xFF, destination is encoded in PS=0x00 +# 0x18EA00FF: priority=6, r=0, dp=0, PF=0xEA, PS=0x00, SA=0xFF +can_id = j1939_to_can_id(6, 0, 0, 0xEA, 0x00, 0xFF) +assert can_id == 0x18EA00FF & 0x1FFFFFFF +fields = can_id_to_j1939(can_id) +assert fields['priority'] == 6 +assert fields['pdu_format'] == 0xEA +assert fields['pdu_specific'] == 0x00 +assert fields['src'] == 0xFF + + +############ +############ ++ pgn_from_fields additional coverage +# Inspired by TruckDevil test_j1939_message_pgn_destination_specific_vs_broadcast + += pgn_from_fields – PDU2 with group extension exactly at 0xF0 (TruckDevil: 0x18FEF000 → 0xFEF0) +# CAN ID 0x18FEF000: priority=6, r=0, dp=0, PF=0xFE (254), PS=0xF0, SA=0x00 +# PF >= 240 → PDU2 → PGN includes PS: (0 << 16) | (0xFE << 8) | 0xF0 = 0xFEF0 +pgn = pgn_from_fields(0, 0xFE, 0xF0) +assert pgn == 0xFEF0, "PDU2 PGN must include group-extension PS; got 0x%05X" % pgn + += pgn_from_fields – PDU1 PGN masks out destination address (TruckDevil: 0x00EC0B00 → 0xEC00) +# CAN ID 0x00EC0B00: priority=0, r=0, dp=0, PF=0xEC (236), PS=0x0B, SA=0x00 +# PF < 240 → PDU1 → PS is destination address, not part of PGN +pgn = pgn_from_fields(0, 0xEC, 0x0B) +assert pgn == 0xEC00, "PDU1 PGN must not include destination address; got 0x%05X" % pgn + += pgn_from_fields – EEC1 (Electronic Engine Controller 1, PGN 61444 = 0xF004) +# J1939 PGN 61444 (0xF004): dp=0, PF=0xF0, PS=0x04 – broadcast +pgn = pgn_from_fields(0, 0xF0, 0x04) +assert pgn == 0xF004, "EEC1 PGN should be 0xF004; got 0x%05X" % pgn + + +############ +############ ++ J1939_CAN with well-known realistic CAN IDs +# Inspired by TruckDevil test_j1939_message_creation_and_properties + += J1939_CAN – Request PGN frame (CAN ID 0x18EA00FF) +# CAN ID 0x18EA00FF: priority=6, r=0, dp=0, PF=0xEA, PS=0x00, SA=0xFF +raw_req = bytes.fromhex('98EA00FF08000000AABBCCDDEEFF0011') +p_req = J1939_CAN(raw_req) +assert p_req.priority == 6 +assert p_req.pdu_format == 0xEA +assert p_req.pdu_specific == 0x00 +assert p_req.src == 0xFF +assert p_req.pgn == 0xEA00 # PDU1: PS is destination, not in PGN +assert p_req.dst == 0x00 # destination address is PS=0x00 + += J1939_CAN – EEC1 Engine Speed frame (CAN ID 0x18F00400, PGN 0xF004) +# CAN ID 0x18F00400: priority=6, r=0, dp=0, PF=0xF0, PS=0x04, SA=0x00 +# Typical engine controller broadcast; SA=0x00 = Engine #1 +raw_eec1 = bytes.fromhex('98F0040008000000F87D7D000000F07D') +p_eec1 = J1939_CAN(raw_eec1) +assert p_eec1.priority == 6 +assert p_eec1.pdu_format == 0xF0 +assert p_eec1.pdu_specific == 0x04 +assert p_eec1.src == 0x00 +assert p_eec1.pgn == 0xF004 # PDU2: PGN includes group-extension PS +assert p_eec1.dst == J1939_NO_ADDR # broadcast + += J1939_CAN – TP.CM PDU1 frame (CAN ID 0x0CEC000B, priority=3) +# priority=3, r=0, dp=0, PF=0xEC (236), PS=0x00, SA=0x0B +# 32-bit header word (big-endian) = flags[3]+priority[3]+r[1]+dp[1]+PF[8]+PS[8]+SA[8] +# = 0b011 (flags=extended) | 011 (priority=3) | 0 (r) | 0 (dp) | 0xEC | 0x00 | 0x0B +# → 0x6CEC000B +raw_tp_cm = bytes.fromhex('6CEC000B08000000' + '20100000FFFF0300') +p_tp_cm = J1939_CAN(raw_tp_cm) +assert p_tp_cm.priority == 3, "priority=%d" % p_tp_cm.priority +assert p_tp_cm.pdu_format == 0xEC +assert p_tp_cm.pdu_specific == 0x00 +assert p_tp_cm.src == 0x0B +assert p_tp_cm.pgn == 0xEC00 # PDU1: PS=0x00 is destination, not in PGN +assert p_tp_cm.dst == 0x00 + + +############ +############ ++ J1939 application-layer packet tests + += J1939 default creation +p = J1939() +assert p.data == b'' +assert p.priority == 6 +assert p.pgn == 0 +assert p.src == J1939_NO_ADDR +assert p.dst == J1939_NO_ADDR +assert bytes(p) == b'' + += J1939 creation with data bytes +p = J1939(b'\x01\x02\x03') +assert p.data == b'\x01\x02\x03' +assert bytes(p) == b'\x01\x02\x03' + += J1939 creation with all metadata +p = J1939(b'\xAA\xBB', priority=3, pgn=0xFECA, src=0x00, dst=0xFF) +assert p.priority == 3 +assert p.pgn == 0xFECA +assert p.src == 0x00 +assert p.dst == 0xFF +assert p.data == b'\xAA\xBB' + += J1939 addresses are not in wire bytes +p = J1939(b'\x01', priority=3, pgn=0xFECA, src=0x00, dst=0xFF) +assert bytes(p) == b'\x01' + += J1939 answers +p = J1939(b'\x01\x02', pgn=0xFECA) +r = J1939(b'\x01\x02', pgn=0xFECA) +assert p.answers(r) + += J1939 does not answer different data +p = J1939(b'\x01') +r = J1939(b'\x02') +assert not p.answers(r) + += J1939 does not answer non-J1939 +p = J1939(b'\x01') +from scapy.packet import Raw +assert not p.answers(Raw(b'\x01')) + + +############ +############ ++ J1939_CAN packet tests + += J1939_CAN default packet creation +p = J1939_CAN() +assert p.priority == 6 +assert p.reserved == 0 +assert p.data_page == 0 +assert p.pdu_format == 0xFE +assert p.pdu_specific == 0xFF +assert p.src == 0xFE + += J1939_CAN byte layout is identical to CAN +# Build manually to verify first-4-byte identity +p = J1939_CAN( + priority=6, reserved=0, data_page=0, + pdu_format=0xFE, pdu_specific=0xCA, src=0x00, + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' +) +b = bytes(p) +# First 4 bytes encode flags(3)+J1939 fields(29) +# flags=extended=0b100, then priority=6(0b110), r=0, dp=0, pf=0xFE, ps=0xCA, sa=0x00 +# The 32-bit word (big-endian) = 0b100_110_0_0 | 0xFE | 0xCA | 0x00 +# = 0b10011000 | 0xFE | 0xCA | 0x00 = 0x98FECA00 +import struct +word = struct.unpack('>I', b[:4])[0] +assert word == 0x98FECA00, "word=0x%08X" % word + += J1939_CAN wire layout matches CAN +from scapy.layers.can import CAN +can_id_val = j1939_to_can_id(6, 0, 0, 0xFE, 0xCA, 0x00) +p_can = CAN(flags=0b100, identifier=can_id_val, + data=b'\x01\x02\x03\x04\x05\x06\x07\x08') +p_j = J1939_CAN( + priority=6, reserved=0, data_page=0, + pdu_format=0xFE, pdu_specific=0xCA, src=0x00, + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' +) +assert bytes(p_can) == bytes(p_j), \ + "CAN bytes: %r\nJ1939_CAN bytes: %r" % (bytes(p_can), bytes(p_j)) + += J1939_CAN from_can conversion +can_id_val = j1939_to_can_id(6, 0, 0, 0xFE, 0xCA, 0x00) +p_can = CAN(flags=0b100, identifier=can_id_val, data=b'\x01\x02') +p_j = J1939_CAN.from_can(p_can) +assert p_j.priority == 6 +assert p_j.pdu_format == 0xFE +assert p_j.pdu_specific == 0xCA +assert p_j.src == 0x00 +assert p_j.data == b'\x01\x02' + += J1939_CAN to_can conversion +p_j = J1939_CAN( + priority=6, reserved=0, data_page=0, + pdu_format=0xFE, pdu_specific=0xCA, src=0x00, + data=b'\x01\x02' +) +p_can = p_j.to_can() +assert p_can.identifier == j1939_to_can_id(6, 0, 0, 0xFE, 0xCA, 0x00) +assert p_can.data == b'\x01\x02' + += J1939_CAN pgn property – PDU2 +p = J1939_CAN(pdu_format=0xFE, pdu_specific=0xCA) +assert p.pgn == 0xFECA + += J1939_CAN pgn property – PDU1 +p = J1939_CAN(pdu_format=0xEC, pdu_specific=0x00) +assert p.pgn == 0xEC00 + += J1939_CAN pgn property – PDU1 (PS is DA, not in PGN) +p1 = J1939_CAN(pdu_format=0xEB, pdu_specific=0x00) +p2 = J1939_CAN(pdu_format=0xEB, pdu_specific=0xFF) +assert p1.pgn == p2.pgn == 0xEB00 + += J1939_CAN dst property – PDU1 +p = J1939_CAN(pdu_format=0xEC, pdu_specific=0x42) +assert p.dst == 0x42 + += J1939_CAN dst property – PDU2 (broadcast) +p = J1939_CAN(pdu_format=0xFE, pdu_specific=0xCA) +assert p.dst == J1939_NO_ADDR + += J1939_CAN round-trip build and dissect +p = J1939_CAN( + priority=6, reserved=0, data_page=0, + pdu_format=0xFE, pdu_specific=0xCA, src=0x00, + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' +) +p2 = J1939_CAN(bytes(p)) +assert p2.priority == p.priority +assert p2.pdu_format == p.pdu_format +assert p2.pdu_specific == p.pdu_specific +assert p2.src == p.src +assert p2.data == p.data + += J1939_CAN dissect known CAN ID 0x18FECA00 (Engine Speed, broadcast) +# 0x18FECA00 & 0x1FFFFFFF as 29-bit = 0x18FECA00 & 0x1FFFFFFF +# flags=extended=4 → 0b100 +# Let's build from raw bytes +raw_frame = dhex('98FECA00' + '08' + '000000' + 'FF FF FF FF FF FF FF FF') +p = J1939_CAN(raw_frame) +assert p.priority == 6 +assert p.pdu_format == 0xFE +assert p.pdu_specific == 0xCA +assert p.src == 0x00 +assert p.data == b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' +assert p.pgn == 0xFECA + += J1939_CAN dissect PDU1 frame (peer-to-peer, PGN 0xEC00 TP.CM) +# PGN 0xEC00 (TP.CM), src=0x00, dst=0xFF +raw_frame = dhex('9800FF00' + '08' + '000000' + '20 14 00 03 FF CA FE 00') +p = J1939_CAN(raw_frame) +assert p.priority == 6, "priority=%d" % p.priority +assert p.pdu_format == 0x00 +assert p.pdu_specific == 0xFF # destination +assert p.src == 0x00 +assert p.pgn == 0x0000 +assert p.dst == 0xFF + + +############ +############ ++ J1939 TP Connection Management tests + += J1939_TP_CM_BAM build +bam = J1939_TP_CM_BAM(total_size=20, num_packets=3, pgn=0xFECA) +b = bytes(bam) +assert b[0] == J1939_TP_CTRL_BAM # ctrl = 32 +assert b[1] == 20 # total_size low byte (LE) +assert b[2] == 0 # total_size high byte (LE) +assert b[3] == 3 # num_packets +assert b[4] == 0xFF # reserved +# PGN 0x00FECA stored little-endian: 0xCA, 0xFE, 0x00 +assert b[5] == 0xCA +assert b[6] == 0xFE +assert b[7] == 0x00 + += J1939_TP_CM_BAM dissect +raw_bam = bytes([32, 20, 0, 3, 0xFF, 0xCA, 0xFE, 0x00]) +p = J1939_TP_CM.dispatch_hook(raw_bam) +p = p(raw_bam) +assert isinstance(p, J1939_TP_CM_BAM) +assert p.ctrl == J1939_TP_CTRL_BAM +assert p.total_size == 20 +assert p.num_packets == 3 +assert p.pgn == 0xFECA + += J1939_TP_CM_RTS build and dissect +rts = J1939_TP_CM_RTS(total_size=100, num_packets=15, max_packets=0xFF, pgn=0xEF00) +b = bytes(rts) +assert b[0] == J1939_TP_CTRL_RTS +p = J1939_TP_CM_RTS(b) +assert p.total_size == 100 +assert p.num_packets == 15 +assert p.max_packets == 0xFF +assert p.pgn == 0xEF00 + += J1939_TP_CM_CTS build and dissect +cts = J1939_TP_CM_CTS(num_packets=5, next_packet=1, pgn=0xEF00) +b = bytes(cts) +assert b[0] == J1939_TP_CTRL_CTS +p = J1939_TP_CM_CTS(b) +assert p.num_packets == 5 +assert p.next_packet == 1 +assert p.pgn == 0xEF00 + += J1939_TP_CM_ACK build and dissect +ack = J1939_TP_CM_ACK(total_size=100, num_packets=15, pgn=0xEF00) +b = bytes(ack) +assert b[0] == J1939_TP_CTRL_ACK +p = J1939_TP_CM_ACK(b) +assert p.total_size == 100 +assert p.num_packets == 15 +assert p.pgn == 0xEF00 + += J1939_TP_CM_ABORT build and dissect +abort = J1939_TP_CM_ABORT(reason=3, pgn=0xEF00) +b = bytes(abort) +assert b[0] == J1939_TP_CTRL_ABORT +p = J1939_TP_CM_ABORT(b) +assert p.reason == 3 +assert p.pgn == 0xEF00 + += J1939_TP_CM dispatch_hook selects correct subclass +assert J1939_TP_CM.dispatch_hook(bytes([J1939_TP_CTRL_RTS])) == J1939_TP_CM_RTS +assert J1939_TP_CM.dispatch_hook(bytes([J1939_TP_CTRL_CTS])) == J1939_TP_CM_CTS +assert J1939_TP_CM.dispatch_hook(bytes([J1939_TP_CTRL_ACK])) == J1939_TP_CM_ACK +assert J1939_TP_CM.dispatch_hook(bytes([J1939_TP_CTRL_BAM])) == J1939_TP_CM_BAM +assert J1939_TP_CM.dispatch_hook(bytes([J1939_TP_CTRL_ABORT])) == J1939_TP_CM_ABORT + += J1939_TP_DT build and dissect +dt = J1939_TP_DT(seq_num=1, data=b'\x01\x02\x03\x04\x05\x06\x07') +b = bytes(dt) +assert len(b) == 8 +assert b[0] == 1 +assert b[1:8] == b'\x01\x02\x03\x04\x05\x06\x07' + += J1939_TP_DT round-trip +dt = J1939_TP_DT(seq_num=7, data=b'\xAA\xBB\xCC\xDD\xEE\xFF\x11') +p = J1939_TP_DT(bytes(dt)) +assert p.seq_num == 7 +assert p.data == b'\xAA\xBB\xCC\xDD\xEE\xFF\x11' + + +############ +############ ++ J1939 TP Communication Commands – extended unit tests + += J1939_TP_CM_RTS default field values +rts = J1939_TP_CM_RTS() +assert rts.ctrl == J1939_TP_CTRL_RTS, "ctrl=%d" % rts.ctrl +assert rts.max_packets == 0xFF, "max_packets=%d" % rts.max_packets +assert len(bytes(rts)) == 8, "RTS wire size must be 8 bytes" + += J1939_TP_CM_RTS PGN stored little-endian in wire bytes +# PGN 0x1FECA → LE bytes: 0xCA, 0xFE, 0x01 +rts = J1939_TP_CM_RTS(total_size=100, num_packets=15, pgn=0x1FECA) +b = bytes(rts) +assert b[5] == 0xCA, "b[5]=0x%02X" % b[5] +assert b[6] == 0xFE, "b[6]=0x%02X" % b[6] +assert b[7] == 0x01, "b[7]=0x%02X" % b[7] + += J1939_TP_CM_RTS max_packets field round-trip +for maxp in [1, 5, 0xFE, 0xFF]: + rts = J1939_TP_CM_RTS(total_size=100, num_packets=15, max_packets=maxp, pgn=0xFECA) + p = J1939_TP_CM_RTS(bytes(rts)) + assert p.max_packets == maxp, "maxp=%d roundtrip=%d" % (maxp, p.max_packets) + += J1939_TP_CM_CTS reserved bytes are 0xFFFF by default +cts = J1939_TP_CM_CTS(num_packets=5, next_packet=1, pgn=0xFECA) +b = bytes(cts) +assert b[3] == 0xFF, "reserved high byte=0x%02X" % b[3] +assert b[4] == 0xFF, "reserved low byte=0x%02X" % b[4] + += J1939_TP_CM_CTS next_packet field round-trip +for nxt in [1, 4, 7, 255]: + cts = J1939_TP_CM_CTS(num_packets=3, next_packet=nxt, pgn=0xFECA) + p = J1939_TP_CM_CTS(bytes(cts)) + assert p.next_packet == nxt, "next_packet=%d roundtrip=%d" % (nxt, p.next_packet) + += J1939_TP_CM_ACK reserved byte is 0xFF by default +ack = J1939_TP_CM_ACK(total_size=100, num_packets=15, pgn=0xEF00) +b = bytes(ack) +assert b[4] == 0xFF, "reserved=0x%02X" % b[4] +assert len(b) == 8, "ACK wire size must be 8 bytes" + += J1939_TP_CM_ABORT default reserved bytes +abort = J1939_TP_CM_ABORT() +b = bytes(abort) +assert b[0] == J1939_TP_CTRL_ABORT, "ctrl=0x%02X" % b[0] +assert b[2] == 0xFF and b[3] == 0xFF, "reserved word not 0xFFFF" +assert b[4] == 0xFF, "reserved2=0x%02X" % b[4] +assert len(b) == 8, "ABORT wire size must be 8 bytes" + += J1939_TP_CM_ABORT reason field round-trip for all defined codes +for reason in [1, 2, 3, 4, 5, 6, 7, 8, 250, 251, 252, 253, 254, 255]: + abort = J1939_TP_CM_ABORT(reason=reason, pgn=0xFECA) + p = J1939_TP_CM_ABORT(bytes(abort)) + assert p.reason == reason, "reason=%d roundtrip=%d" % (reason, p.reason) + += J1939_TP_DT default padding is 0xFF +dt = J1939_TP_DT() +assert dt.data == b'\xff' * 7, "default data=%r" % dt.data +assert dt.seq_num == 1, "default seq_num=%d" % dt.seq_num + += J1939_TP_DT three-packet session assembles original payload +# 20-byte payload split across 3 TP.DT frames (7+7+6+pad) +tx_payload = bytes(range(0x01, 0x15)) # 20 bytes +dt1 = J1939_TP_DT(seq_num=1, data=tx_payload[0:7]) +dt2 = J1939_TP_DT(seq_num=2, data=tx_payload[7:14]) +dt3 = J1939_TP_DT(seq_num=3, data=tx_payload[14:20] + b'\xFF') # 1 pad byte +assert dt1.seq_num == 1 +assert dt2.seq_num == 2 +assert dt3.seq_num == 3 +reassembled = dt1.data + dt2.data + dt3.data +assert reassembled[:len(tx_payload)] == tx_payload, \ + "reassembled=%r expected=%r" % (reassembled[:20], tx_payload) + += J1939_TP_CM dispatch_hook returns J1939_TP_CM for empty or unknown ctrl +# empty bytes +assert J1939_TP_CM.dispatch_hook(b'') is J1939_TP_CM +# None +assert J1939_TP_CM.dispatch_hook(None) is J1939_TP_CM +# unknown ctrl bytes (0x00, 18, 100) +assert J1939_TP_CM.dispatch_hook(bytes([0x00])) is J1939_TP_CM +assert J1939_TP_CM.dispatch_hook(bytes([18])) is J1939_TP_CM +assert J1939_TP_CM.dispatch_hook(bytes([100])) is J1939_TP_CM + += Complete BAM session – exact wire bytes for BAM and three TP.DT frames +tx_payload = bytes(range(0x01, 0x15)) # 20 bytes: 0x01..0x14 +# BAM: ctrl=32, total_size=20 (LE: 20,0), num_packets=3, reserved=0xFF, PGN 0xFECA (LE: 0xCA,0xFE,0x00) +bam = J1939_TP_CM_BAM(total_size=20, num_packets=3, pgn=0xFECA) +assert bytes(bam) == bytes([32, 20, 0, 3, 0xFF, 0xCA, 0xFE, 0x00]) +# DT frames +dt1 = J1939_TP_DT(seq_num=1, data=tx_payload[0:7]) +dt2 = J1939_TP_DT(seq_num=2, data=tx_payload[7:14]) +dt3 = J1939_TP_DT(seq_num=3, data=tx_payload[14:20] + b'\xFF') +assert len(bytes(dt1)) == len(bytes(dt2)) == len(bytes(dt3)) == 8 +assert bytes(dt1) == bytes([1]) + tx_payload[0:7] +assert bytes(dt2) == bytes([2]) + tx_payload[7:14] +assert bytes(dt3) == bytes([3]) + tx_payload[14:20] + b'\xFF' + += RTS/CTS/ACK handshake – wire byte verification +# RTS: ctrl=16, total_size=18 (LE), num_packets=3, max_packets=1, PGN 0xEF00 (LE) +rts = J1939_TP_CM_RTS(total_size=18, num_packets=3, max_packets=1, pgn=0xEF00) +b = bytes(rts) +assert b[0] == J1939_TP_CTRL_RTS +assert b[1] == 18 and b[2] == 0 # total_size=18 LE +assert b[3] == 3 # num_packets +assert b[4] == 1 # max_packets +assert b[5] == 0x00 and b[6] == 0xEF and b[7] == 0x00 # PGN 0xEF00 LE + +# CTS: ctrl=17, num_packets=1, next_packet=1, reserved=0xFFFF, PGN 0xEF00 (LE) +cts = J1939_TP_CM_CTS(num_packets=1, next_packet=1, pgn=0xEF00) +b = bytes(cts) +assert b[0] == J1939_TP_CTRL_CTS +assert b[1] == 1 # num_packets +assert b[2] == 1 # next_packet +assert b[3] == 0xFF and b[4] == 0xFF # reserved=0xFFFF +assert b[5] == 0x00 and b[6] == 0xEF and b[7] == 0x00 # PGN 0xEF00 LE + +# ACK: ctrl=19, total_size=18 (LE), num_packets=3, reserved=0xFF, PGN 0xEF00 (LE) +ack = J1939_TP_CM_ACK(total_size=18, num_packets=3, pgn=0xEF00) +b = bytes(ack) +assert b[0] == J1939_TP_CTRL_ACK +assert b[1] == 18 and b[2] == 0 # total_size=18 LE +assert b[3] == 3 # num_packets +assert b[4] == 0xFF # reserved +assert b[5] == 0x00 and b[6] == 0xEF and b[7] == 0x00 # PGN 0xEF00 LE + + +############ +############ ++ J1939_CAN inherits from CAN + += J1939_CAN is a subclass of CAN +from scapy.layers.can import CAN +assert issubclass(J1939_CAN, CAN) + += J1939_CAN dispatch_hook always returns J1939_CAN +raw_j1939_frame = bytes.fromhex('98feca0008000000ffffffffffffffff') +assert J1939_CAN.dispatch_hook(raw_j1939_frame) is J1939_CAN + += J1939_CAN instantiation does not redirect to CAN +pkt = J1939_CAN(raw_j1939_frame) +assert isinstance(pkt, J1939_CAN), "Expected J1939_CAN, got %s" % type(pkt).__name__ +assert pkt.pgn == 0xFECA +assert pkt.src == 0x00 + += J1939_CAN inherits pre_dissect / post_build (swap-bytes) +import struct as _struct +from scapy.config import conf as _conf + +# Round-trip: build with swap-bytes=True (post_build swaps), parse with swap-bytes=True (pre_dissect unswaps) +_conf.contribs['CAN']['swap-bytes'] = False +raw_unswapped = bytes(J1939_CAN(priority=6, data_page=0, pdu_format=0xFE, pdu_specific=0xCA, src=0x00)) + +_conf.contribs['CAN']['swap-bytes'] = True +p_sw = J1939_CAN(priority=6, data_page=0, pdu_format=0xFE, pdu_specific=0xCA, + src=0x00, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') +raw_sw = bytes(p_sw) +# post_build (swap-bytes=True) must produce bytes different from the unswapped encoding +assert raw_sw[:4] != raw_unswapped[:4], "post_build should have swapped first 4 bytes" +# pre_dissect (swap-bytes=True) unswaps → same field values as if built with swap-bytes=False +p_rt = J1939_CAN(raw_sw) +assert p_rt.pdu_format == 0xFE, "pdu_format=%d" % p_rt.pdu_format +assert p_rt.src == 0x00 +_conf.contribs['CAN']['swap-bytes'] = False + += J1939_CAN inherits extract_padding (remove-padding) +_conf.contribs['CAN']['remove-padding'] = True +p_padded = J1939_CAN(raw_j1939_frame + b'\x00\x00\x00\x00') +assert p_padded.data == b'\xff' * 8, "Padding should have been stripped" + += J1939_CAN to_can produces a plain CAN packet with identical bytes +pj = J1939_CAN(priority=6, data_page=0, pdu_format=0xFE, pdu_specific=0xCA, + src=0x00, data=b'\xDE\xAD') +pc = pj.to_can() +assert isinstance(pc, CAN) +assert bytes(pc) == bytes(pj) + += J1939_CAN from_can produces a J1939_CAN packet with identical bytes +can_id_val = j1939_to_can_id(6, 0, 0, 0xFE, 0xCA, 0x00) +pc2 = CAN(flags=0b100, identifier=can_id_val, data=b'\xDE\xAD') +pj2 = J1939_CAN.from_can(pc2) +assert isinstance(pj2, J1939_CAN) +assert bytes(pj2) == bytes(pc2) +assert pj2.pgn == 0xFECA +assert pj2.src == 0x00 + += J1939_CAN to_can / from_can are inverse of each other +pj_orig = J1939_CAN(priority=6, data_page=0, pdu_format=0xEC, pdu_specific=0x42, + src=0x11, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') +pj_rt = J1939_CAN.from_can(pj_orig.to_can()) +assert pj_rt.pdu_format == pj_orig.pdu_format +assert pj_rt.pdu_specific == pj_orig.pdu_specific +assert pj_rt.src == pj_orig.src +assert pj_rt.data == pj_orig.data +assert pj_rt.pgn == pj_orig.pgn +assert pj_rt.dst == pj_orig.dst + + +############ +############ ++ rdpcap with J1939_CAN frames + += rdpcap reads J1939 CAN frames as CAN packets +from io import BytesIO +from scapy.utils import rdpcap +from scapy.config import conf as _conf +_conf.contribs['CAN']['swap-bytes'] = False +_conf.contribs['CAN']['remove-padding'] = True + +# Deterministic PCAP (DLT_CAN_SOCKETCAN = 0xe3) containing two J1939 CAN frames: +# Frame 1: PGN 0xFECA (broadcast, SA=0x00) data=FF*8 +# Frame 2: PGN 0xEC00 (TP.CM BAM to 0xFF, SA=0x00) +j1939_pcap_bytes = ( + b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xff\xff\x00\x00\xe3\x00\x00\x00' + b'\xe8\x03\x00\x00\x00\x00\x00\x00\x10\x00\x00\x00\x10\x00\x00\x00' + b'\x98\xfe\xca\x00\x08\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff' + b'\xe9\x03\x00\x00\x00\x00\x00\x00\x10\x00\x00\x00\x10\x00\x00\x00' + b'\x98\xec\xff\x00\x08\x00\x00\x00\x20\x14\x00\x03\xff\xca\xfe\x00' +) +pkts = rdpcap(BytesIO(j1939_pcap_bytes)) +assert len(pkts) == 2 +assert all(CAN in p for p in pkts), "All packets should have a CAN layer" + += rdpcap J1939 frame 1: PGN 0xFECA broadcast +j1 = J1939_CAN.from_can(pkts[0]) +assert isinstance(j1, J1939_CAN) +assert j1.pgn == 0xFECA, "Expected PGN 0xFECA, got 0x%05X" % j1.pgn +assert j1.src == 0x00 +assert j1.dst == J1939_NO_ADDR, "PDU2 frame must have broadcast destination" +assert j1.data == b'\xff' * 8 + += rdpcap J1939 frame 2: TP.CM BAM (PGN 0xEC00, DA=0xFF) +j2 = J1939_CAN.from_can(pkts[1]) +assert isinstance(j2, J1939_CAN) +assert j2.pgn == 0xEC00, "Expected PGN 0xEC00, got 0x%05X" % j2.pgn +assert j2.dst == 0xFF, "PDU1 destination should be 0xFF" +assert j2.src == 0x00 +assert j2.data == bytes([32, 20, 0, 3, 0xFF, 0xCA, 0xFE, 0x00]) + += wrpcap / rdpcap round-trip with J1939_CAN +import tempfile as _tempfile, os as _os +from scapy.utils import wrpcap + +# Build two J1939_CAN frames and write them as CAN (to_can) via wrpcap +p1 = J1939_CAN(priority=6, data_page=0, pdu_format=0xFE, pdu_specific=0xCA, + src=0x00, data=b'\x01\x02\x03\x04\x05\x06\x07\x08') +p2 = J1939_CAN(priority=6, data_page=0, pdu_format=0xF0, pdu_specific=0x04, + src=0x0B, data=b'\x10\x20\x30\x40\x50\x60\x70\x80') + +_tf = _tempfile.NamedTemporaryFile(delete=False, suffix='.pcap') +_tf.close() +try: + wrpcap(_tf.name, [p1.to_can(), p2.to_can()]) + read_pkts = rdpcap(_tf.name) +finally: + _os.unlink(_tf.name) + +assert len(read_pkts) == 2 + +r1 = J1939_CAN.from_can(read_pkts[0]) +r2 = J1939_CAN.from_can(read_pkts[1]) + +assert r1.pgn == 0xFECA +assert r1.src == 0x00 +assert r1.data == b'\x01\x02\x03\x04\x05\x06\x07\x08' + +assert r2.pgn == 0xF004 +assert r2.src == 0x0B +assert r2.data == b'\x10\x20\x30\x40\x50\x60\x70\x80' + + +############ +############ ++ rdcandump with J1939_CAN frames + += rdcandump reads J1939 CAN frames as CAN packets +from io import BytesIO +from scapy.layers.can import rdcandump, CandumpReader + +# candump log-file format: (timestamp) interface canid#hexdata +# J1939 extended CAN IDs are 8 hex digits (>0x7FF) +# 18FECA00 → priority=6 dp=0 pf=0xFE ps=0xCA sa=0x00 (PGN 0xFECA) +# 18ECFF00 → priority=6 dp=0 pf=0xEC ps=0xFF sa=0x00 (PGN TP.CM, DA=0xFF) +j1939_candump = BytesIO(b'''(1539191392.761779) vcan0 18FECA00#FFFFFFFFFFFFFFFF +(1539191392.861779) vcan0 18ECFF00#20140003FFCAFE00 +''') + +pkts = rdcandump(j1939_candump) +assert len(pkts) == 2, "Expected 2 packets, got %d" % len(pkts) +# rdcandump treats 8-byte hex payload (16 hex chars) as CANFD in log format; +# CANFD inherits from CAN so isinstance check works +assert all(isinstance(p, CAN) for p in pkts) + += rdcandump J1939 frame 1: PGN 0xFECA (PDU2 broadcast) +j1 = J1939_CAN.from_can(pkts[0]) +assert isinstance(j1, J1939_CAN) +assert j1.pgn == 0xFECA, "Expected 0xFECA, got 0x%05X" % j1.pgn +assert j1.src == 0x00 +assert j1.dst == J1939_NO_ADDR +assert j1.data == b'\xff' * 8 +# from_can preserves the CAN packet's timestamp +assert abs(j1.time - 1539191392.761779) < 1e-3 + += rdcandump J1939 frame 2: TP.CM BAM (PDU1, DA=0xFF, PGN 0xEC00) +j2 = J1939_CAN.from_can(pkts[1]) +assert isinstance(j2, J1939_CAN) +assert j2.pgn == 0xEC00, "Expected 0xEC00, got 0x%05X" % j2.pgn +assert j2.dst == 0xFF +assert j2.src == 0x00 +assert j2.data == bytes([32, 20, 0, 3, 0xFF, 0xCA, 0xFE, 0x00]) +assert abs(j2.time - 1539191392.861779) < 1e-3 + += rdcandump non-log-file (column) format with J1939 extended frame +j1939_candump_col = BytesIO(b' vcan0 18FECA00 [8] FF FF FF FF FF FF FF FF\n') +pkts_col = rdcandump(j1939_candump_col) +assert len(pkts_col) == 1 +jc = J1939_CAN.from_can(pkts_col[0]) +assert jc.pgn == 0xFECA +assert jc.src == 0x00 +assert jc.data == b'\xff' * 8 + += CandumpReader iterable yields J1939 frames +j1939_log = BytesIO(b'''(1539191392.761779) vcan0 18FECA00#FFFFFFFFFFFFFFFF +(1539191392.861779) vcan0 18ECFF00#20140003FFCAFE00 +''') +j1939_frames = [J1939_CAN.from_can(pkt) for pkt in CandumpReader(j1939_log)] +assert len(j1939_frames) == 2 +assert j1939_frames[0].pgn == 0xFECA +assert j1939_frames[1].pgn == 0xEC00 + + +############ +############ ++ J1939 NativeJ1939Socket tests +~ vcan_socket needs_root + += Setup +import os +import threading +from time import sleep +from subprocess import call + +bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" +assert 0 == os.system(bashCommand) + += NativeJ1939Socket import +from scapy.contrib.j1939 import NativeJ1939Socket + += NativeJ1939Socket creation +sock = NativeJ1939Socket("vcan0", promisc=True) +sock.close() + += NativeJ1939Socket send and recv +import socket as _socket + +sender = NativeJ1939Socket("vcan0", src_addr=J1939_IDLE_ADDR, pgn=J1939_NO_PGN, promisc=False) +receiver = NativeJ1939Socket("vcan0", promisc=True) + +tx_data = b'\x01\x02\x03\x04' +tx_pgn = 0xFECA + +def _send(): + sleep(0.1) + msg = J1939(tx_data, pgn=tx_pgn, src=J1939_IDLE_ADDR, dst=0xFF) + sender.send(msg) + +t = threading.Thread(target=_send) +t.start() + +receiver.ins.settimeout(3.0) +rx = receiver.recv() +t.join() + +sender.close() +receiver.close() + +assert rx is not None, "No packet received" +assert rx.data == tx_data, "Data mismatch: %r != %r" % (rx.data, tx_data) +assert rx.pgn == tx_pgn, "PGN mismatch: 0x%X != 0x%X" % (rx.pgn, tx_pgn) +assert rx.src == J1939_IDLE_ADDR, "SA mismatch: %d" % rx.src + += NativeJ1939Socket long message sends TP.CM BAM + TP.DT to NativeCANSocket +# J1939 kernel stack auto-segments the payload; NativeCANSocket collects the raw CAN frames +from scapy.contrib.cansocket_native import NativeCANSocket + +tx_long_payload = bytes(range(0x01, 0x15)) # 20 bytes → 1 BAM + 3 TP.DT +tx_long_pgn = 0xFECA +tx_long_src = J1939_IDLE_ADDR # 0xFE: no address claiming needed + +j1939_sender_long = NativeJ1939Socket("vcan0", src_addr=tx_long_src, promisc=False) +can_receiver_long = NativeCANSocket("vcan0", basecls=J1939_CAN) +can_receiver_long.ins.settimeout(3.0) + +collected_frames = [] + +def _send_long(): + sleep(0.1) + msg = J1939(tx_long_payload, pgn=tx_long_pgn, src=tx_long_src, dst=0xFF) + j1939_sender_long.send(msg) + +t_long = threading.Thread(target=_send_long) +t_long.start() + +try: + for _ in range(4): # 1 BAM + 3 DT + f = can_receiver_long.recv() + if f is not None: + collected_frames.append(f) +except Exception: + pass + +t_long.join() +j1939_sender_long.close() +can_receiver_long.close() + +assert len(collected_frames) == 4, "Expected 4 CAN frames, got %d" % len(collected_frames) + +# Frame 0: TP.CM BAM (PF=0xEC) +bam_frame = collected_frames[0] +assert bam_frame.pdu_format == 0xEC, "Expected TP.CM, pf=0x%02X" % bam_frame.pdu_format +bam = J1939_TP_CM_BAM(bam_frame.data) +assert bam.ctrl == J1939_TP_CTRL_BAM +assert bam.total_size == len(tx_long_payload), \ + "total_size=%d expected=%d" % (bam.total_size, len(tx_long_payload)) +assert bam.num_packets == 3, "num_packets=%d" % bam.num_packets +assert bam.pgn == tx_long_pgn, "pgn=0x%X" % bam.pgn + +# Frames 1-3: TP.DT (PF=0xEB) in order; reassembled payload matches +dt_data = b'' +for i, frame in enumerate(collected_frames[1:], start=1): + assert frame.pdu_format == 0xEB, \ + "Frame %d: expected TP.DT, pf=0x%02X" % (i, frame.pdu_format) + dt = J1939_TP_DT(frame.data) + assert dt.seq_num == i, "seq_num=%d expected=%d" % (dt.seq_num, i) + dt_data += dt.data + +assert dt_data[:len(tx_long_payload)] == tx_long_payload, \ + "Reassembled payload mismatch: %r != %r" % (dt_data[:20], tx_long_payload) + += NativeCANSocket sends TP.CM BAM + TP.DT; NativeJ1939Socket receives reassembled message +# The kernel J1939 stack on the receiver side reassembles the incoming TP.CM/TP.DT sequence +from scapy.contrib.cansocket_native import NativeCANSocket + +tx_bam_payload = bytes(range(0x20, 0x34)) # 20 bytes +tx_bam_pgn = 0xFECA +tx_bam_src = 0x0B + +can_sender_bam = NativeCANSocket("vcan0") +j1939_receiver_bam = NativeJ1939Socket("vcan0", promisc=True) +j1939_receiver_bam.ins.settimeout(3.0) + +# Build BAM and TP.DT CAN frames manually +_bam_bytes = bytes(J1939_TP_CM_BAM(total_size=len(tx_bam_payload), num_packets=3, + pgn=tx_bam_pgn)) +_dt1_bytes = bytes(J1939_TP_DT(seq_num=1, data=tx_bam_payload[0:7])) +_dt2_bytes = bytes(J1939_TP_DT(seq_num=2, data=tx_bam_payload[7:14])) +_dt3_bytes = bytes(J1939_TP_DT(seq_num=3, data=tx_bam_payload[14:20] + b'\xFF')) + +_bam_can = J1939_CAN(priority=6, data_page=0, pdu_format=0xEC, pdu_specific=0xFF, + src=tx_bam_src, data=_bam_bytes) +_dt1_can = J1939_CAN(priority=7, data_page=0, pdu_format=0xEB, pdu_specific=0xFF, + src=tx_bam_src, data=_dt1_bytes) +_dt2_can = J1939_CAN(priority=7, data_page=0, pdu_format=0xEB, pdu_specific=0xFF, + src=tx_bam_src, data=_dt2_bytes) +_dt3_can = J1939_CAN(priority=7, data_page=0, pdu_format=0xEB, pdu_specific=0xFF, + src=tx_bam_src, data=_dt3_bytes) + +def _send_bam_sequence(): + sleep(0.1) + for frame in [_bam_can, _dt1_can, _dt2_can, _dt3_can]: + can_sender_bam.send(frame) + sleep(0.005) # 5 ms inter-frame gap + +t_bam = threading.Thread(target=_send_bam_sequence) +t_bam.start() + +rx_bam = j1939_receiver_bam.recv() +t_bam.join() +can_sender_bam.close() +j1939_receiver_bam.close() + +assert rx_bam is not None, "No reassembled J1939 message received" +assert rx_bam.data == tx_bam_payload, \ + "Payload mismatch: %r != %r" % (rx_bam.data, tx_bam_payload) +assert rx_bam.pgn == tx_bam_pgn, "PGN mismatch: 0x%X != 0x%X" % (rx_bam.pgn, tx_bam_pgn) +assert rx_bam.src == tx_bam_src, "SA mismatch: %d != %d" % (rx_bam.src, tx_bam_src) + += Teardown +pass # vcan0 is managed by the test infrastructure; do not delete it here