From b409ed5c30864649b4d310446286689870235b44 Mon Sep 17 00:00:00 2001 From: Nils Weiss Date: Fri, 10 Apr 2026 21:35:27 +0200 Subject: [PATCH] Introduce single-layer and compatibility modes for UDS, KWP, OBD, and GMLAN protocols; add documentation and tests. --- doc/scapy/layers/automotive.rst | 92 ++++++++ scapy/contrib/automotive/gm/gmlan.py | 130 +++++++++-- scapy/contrib/automotive/kwp.py | 198 ++++++++++++++-- scapy/contrib/automotive/obd/iid/iids.py | 10 +- scapy/contrib/automotive/obd/mid/mids.py | 10 +- scapy/contrib/automotive/obd/obd.py | 83 +++++-- scapy/contrib/automotive/obd/pid/pids.py | 8 +- .../contrib/automotive/obd/pid/pids_00_1F.py | 6 +- scapy/contrib/automotive/obd/services.py | 83 ++++++- scapy/contrib/automotive/obd/tid/tids.py | 8 +- scapy/contrib/automotive/uds.py | 213 +++++++++++++++--- test/contrib/automotive/gm/gmlan.uts | 126 ++++++++++- test/contrib/automotive/kwp.uts | 141 ++++++++++++ test/contrib/automotive/obd/obd.uts | 124 ++++++++++ test/contrib/automotive/uds.uts | 176 +++++++++++++++ 15 files changed, 1302 insertions(+), 106 deletions(-) diff --git a/doc/scapy/layers/automotive.rst b/doc/scapy/layers/automotive.rst index 6abee5daa66..a9fa9d3e107 100644 --- a/doc/scapy/layers/automotive.rst +++ b/doc/scapy/layers/automotive.rst @@ -1081,6 +1081,98 @@ to the Scapy interpreter:: .. image:: ../graphics/animations/animation-scapy-uds3.svg + +Single Layer Mode +----------------- + +UDS, KWP, OBD, and GMLAN all support a *single layer mode* that makes each +service packet a standalone ``Packet`` rather than a nested sublayer. + +**Default (multi-layer) mode** + +.. code-block:: python + + >>> pkt = UDS() / UDS_DSC(diagnosticSessionType=0x01) + >>> UDS(b'\x10\x01') + > + +**Single layer mode** + +To enable before loading a module:: + + >>> conf.contribs['UDS'] = {'treat-response-pending-as-answer': False, + ... 'single_layer_mode': True} + >>> load_contrib('automotive.uds') + +To toggle at runtime after loading:: + + >>> conf.contribs['UDS']['single_layer_mode'] = True + >>> UDS(b'\x10\x01') + + >>> bytes(UDS_DSC(diagnosticSessionType=0x01)) + b'\x10\x01' + >>> conf.contribs['UDS']['single_layer_mode'] = False # revert to multi-layer mode + +The same ``single_layer_mode`` key works for all protocols: replace ``'UDS'`` +with ``'KWP'``, ``'OBD'``, or ``'GMLAN'`` as appropriate. + +Compatibility Mode +------------------ + +Scapy allows crafting packets freely, including stacking a service sub-packet +on top of the base protocol layer (e.g. ``UDS()/UDS_DSC()``). When both +``single_layer_mode`` *and* stacking are used together, the ``service`` byte +would normally appear twice in the resulting byte stream – once from the base +layer and once from the sub-packet's own ``service`` ConditionalField. + +The **compatibility mode** flag (``compatibility_mode``, default ``True``) +addresses this: when it is enabled and ``single_layer_mode`` is active, the +sub-packet's ``service`` field is automatically **suppressed** whenever the +immediate underlayer is already the matching base-protocol packet. + +.. list-table:: Behaviour matrix + :header-rows: 1 + :widths: 25 25 50 + + * - ``single_layer_mode`` + - ``compatibility_mode`` + - ``UDS()/UDS_DSC()`` byte layout + * - ``False`` + - any + - ``service`` (UDS) + ``diagnosticSessionType`` (UDS_DSC) + * - ``True`` + - ``True`` *(default)* + - ``service`` (UDS) + ``diagnosticSessionType`` (UDS_DSC) — duplicate suppressed + * - ``True`` + - ``False`` + - ``service`` (UDS) + ``service`` (UDS_DSC) + ``diagnosticSessionType`` (UDS_DSC) + +Example with compatibility mode on (default):: + + >>> conf.contribs['UDS']['single_layer_mode'] = True + >>> conf.contribs['UDS']['compatibility_mode'] = True # already the default + + >>> # Standalone sub-packet: service field IS present (no UDS underlayer) + >>> bytes(UDS_DSC(diagnosticSessionType=0x01)) + b'\x10\x01' + + >>> # Stacked: service field in UDS_DSC is suppressed (UDS is the underlayer) + >>> bytes(UDS() / UDS_DSC(diagnosticSessionType=0x01)) + b'\x10\x01' + +Example with compatibility mode off:: + + >>> conf.contribs['UDS']['compatibility_mode'] = False + + >>> # Stacked: both UDS and UDS_DSC emit a service byte + >>> bytes(UDS() / UDS_DSC(diagnosticSessionType=0x01)) + b'\x10\x10\x01' + + >>> conf.contribs['UDS']['compatibility_mode'] = True # restore default + +The same ``compatibility_mode`` key works for all protocols: replace ``'UDS'`` +with ``'KWP'``, ``'OBD'``, or ``'GMLAN'`` as appropriate. + GMLAN ===== diff --git a/scapy/contrib/automotive/gm/gmlan.py b/scapy/contrib/automotive/gm/gmlan.py index 76c9cf110f1..62c88b208d7 100644 --- a/scapy/contrib/automotive/gm/gmlan.py +++ b/scapy/contrib/automotive/gm/gmlan.py @@ -32,6 +32,12 @@ from scapy.packet import Packet, bind_layers, NoPayload from scapy.config import conf from scapy.contrib.isotp import ISOTP +from scapy.compat import orb + +from typing import ( # noqa: F401 + Dict, + Type, +) """ GMLAN @@ -46,11 +52,38 @@ # "a negative response 'RequestCorrectlyReceived-" # "ResponsePending' as answer of a request. \n" # "The default value is False.") - conf.contribs['GMLAN'] = {'treat-response-pending-as-answer': False} + conf.contribs['GMLAN'] = {'treat-response-pending-as-answer': False, + 'single_layer_mode': False, + 'compatibility_mode': True} conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = None +def _gmlan_slm(pkt): + # type: (Packet) -> bool + """Return True when the service ConditionalField should be present. + + Two configuration keys in ``conf.contribs['GMLAN']`` control the behaviour: + + ``single_layer_mode`` (bool, default ``False``): + When *True*, :class:`GMLAN` acts as a dispatch layer and returns the + matching service sub-packet directly. Each sub-packet gains its own + ``service`` field so that it can be built and dissected stand-alone. + + ``compatibility_mode`` (bool, default ``True``): + Only relevant when ``single_layer_mode`` is *True*. When *True* the + ``service`` field is **suppressed** in a sub-packet whose immediate + underlayer is already a :class:`GMLAN` packet, preventing a duplicate + service byte when sub-packets are stacked (``GMLAN()/GMLAN_IDO()``). + Set to *False* to always emit the ``service`` byte from the sub-packet. + """ + if not conf.contribs['GMLAN'].get('single_layer_mode', False): + return False + if conf.contribs['GMLAN'].get('compatibility_mode', True): + return pkt.underlayer is None or not isinstance(pkt.underlayer, GMLAN) + return True + + class GMLAN(ISOTP): @staticmethod def determine_len(x): @@ -130,6 +163,17 @@ def hashret(self): return struct.pack('B', self.requestServiceId & ~0x40) return struct.pack('B', self.service & ~0x40) + _service_cls = {} # type: Dict[int, Type[Packet]] + + @classmethod + def dispatch_hook(cls, _pkt=b"", *args, **kwargs): + # type: (...) -> type + """Dispatch to the correct GMLAN service class in single layer mode.""" + if conf.contribs['GMLAN'].get('single_layer_mode', False) and len(_pkt) >= 1: + service = orb(_pkt[0]) + return cls._service_cls.get(service, cls) + return cls + # ########################IDO################################### class GMLAN_IDO(Packet): @@ -139,11 +183,13 @@ class GMLAN_IDO(Packet): 0x04: 'wakeUpLinks'} name = 'InitiateDiagnosticOperation' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x10, GMLAN.services), _gmlan_slm), ByteEnumField('subfunction', 0, subfunctions) ] bind_layers(GMLAN, GMLAN_IDO, service=0x10) +GMLAN._service_cls[0x10] = GMLAN_IDO # ########################RFRD################################### @@ -166,18 +212,21 @@ class GMLAN_RFRD(Packet): 0x02: 'readFailureRecordParameters'} name = 'ReadFailureRecordData' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x12, GMLAN.services), _gmlan_slm), ByteEnumField('subfunction', 0, subfunctions), - ConditionalField(PacketField("dtc", b'', GMLAN_DTC), + ConditionalField(PacketField("dtc", None, GMLAN_DTC), lambda pkt: pkt.subfunction == 0x02) ] bind_layers(GMLAN, GMLAN_RFRD, service=0x12) +GMLAN._service_cls[0x12] = GMLAN_RFRD class GMLAN_RFRDPR(Packet): name = 'ReadFailureRecordDataPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x52, GMLAN.services), _gmlan_slm), ByteEnumField('subfunction', 0, GMLAN_RFRD.subfunctions) ] @@ -187,6 +236,7 @@ def answers(self, other): bind_layers(GMLAN, GMLAN_RFRDPR, service=0x52) +GMLAN._service_cls[0x52] = GMLAN_RFRDPR class GMLAN_RFRDPR_RFRI(Packet): @@ -208,7 +258,7 @@ class GMLAN_RFRDPR_RFRI(Packet): class GMLAN_RFRDPR_RFRP(Packet): name = 'ReadFailureRecordDataPositiveResponse_readFailureRecordParameters' fields_desc = [ - PacketField("dtc", b'', GMLAN_DTC) + PacketField("dtc", None, GMLAN_DTC) ] @@ -304,16 +354,19 @@ class GMLAN_RDBI(Packet): name = 'ReadDataByIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x1a, GMLAN.services), _gmlan_slm), XByteEnumField('dataIdentifier', 0, dataIdentifiers) ] -bind_layers(GMLAN, GMLAN_RDBI, service=0x1A) +bind_layers(GMLAN, GMLAN_RDBI, service=0x1a) +GMLAN._service_cls[0x1a] = GMLAN_RDBI class GMLAN_RDBIPR(Packet): name = 'ReadDataByIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x5a, GMLAN.services), _gmlan_slm), XByteEnumField('dataIdentifier', 0, GMLAN_RDBI.dataIdentifiers), ] @@ -322,7 +375,8 @@ def answers(self, other): other.dataIdentifier == self.dataIdentifier -bind_layers(GMLAN, GMLAN_RDBIPR, service=0x5A) +bind_layers(GMLAN, GMLAN_RDBIPR, service=0x5a) +GMLAN._service_cls[0x5a] = GMLAN_RDBIPR # ########################RDBI################################### @@ -334,6 +388,7 @@ class GMLAN_RDBPI(Packet): }) name = 'ReadDataByParameterIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x22, GMLAN.services), _gmlan_slm), FieldListField("identifiers", [], XShortEnumField('parameterIdentifier', 0, dataIdentifiers)) @@ -341,11 +396,13 @@ class GMLAN_RDBPI(Packet): bind_layers(GMLAN, GMLAN_RDBPI, service=0x22) +GMLAN._service_cls[0x22] = GMLAN_RDBPI class GMLAN_RDBPIPR(Packet): name = 'ReadDataByParameterIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x62, GMLAN.services), _gmlan_slm), XShortEnumField('parameterIdentifier', 0, GMLAN_RDBPI.dataIdentifiers), ] @@ -355,6 +412,7 @@ def answers(self, other): bind_layers(GMLAN, GMLAN_RDBPIPR, service=0x62) +GMLAN._service_cls[0x62] = GMLAN_RDBPIPR # ########################RDBPKTI################################### @@ -369,6 +427,7 @@ class GMLAN_RDBPKTI(Packet): } fields_desc = [ + ConditionalField(XByteEnumField('service', 0xaa, GMLAN.services), _gmlan_slm), XByteEnumField('subfunction', 0, subfunctions), ConditionalField(FieldListField('request_DPIDs', [], XByteField("", 0)), @@ -376,13 +435,15 @@ class GMLAN_RDBPKTI(Packet): ] -bind_layers(GMLAN, GMLAN_RDBPKTI, service=0xAA) +bind_layers(GMLAN, GMLAN_RDBPKTI, service=0xaa) +GMLAN._service_cls[0xaa] = GMLAN_RDBPKTI # ########################RMBA################################### class GMLAN_RMBA(Packet): name = 'ReadMemoryByAddress' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x23, GMLAN.services), _gmlan_slm), MultipleTypeField( [ (XShortField('memoryAddress', 0), @@ -398,11 +459,13 @@ class GMLAN_RMBA(Packet): bind_layers(GMLAN, GMLAN_RMBA, service=0x23) +GMLAN._service_cls[0x23] = GMLAN_RMBA class GMLAN_RMBAPR(Packet): name = 'ReadMemoryByAddressPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x63, GMLAN.services), _gmlan_slm), MultipleTypeField( [ (XShortField('memoryAddress', 0), @@ -422,6 +485,7 @@ def answers(self, other): bind_layers(GMLAN, GMLAN_RMBAPR, service=0x63) +GMLAN._service_cls[0x63] = GMLAN_RMBAPR # ########################SA################################### @@ -443,6 +507,7 @@ class GMLAN_SA(Packet): name = 'SecurityAccess' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x27, GMLAN.services), _gmlan_slm), ByteEnumField('subfunction', 0, subfunctions), ConditionalField(XShortField('securityKey', 0), lambda pkt: pkt.subfunction % 2 == 0) @@ -450,11 +515,13 @@ class GMLAN_SA(Packet): bind_layers(GMLAN, GMLAN_SA, service=0x27) +GMLAN._service_cls[0x27] = GMLAN_SA class GMLAN_SAPR(Packet): name = 'SecurityAccessPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x67, GMLAN.services), _gmlan_slm), ByteEnumField('subfunction', 0, GMLAN_SA.subfunctions), ConditionalField(XShortField('securitySeed', 0), lambda pkt: pkt.subfunction % 2 == 1), @@ -466,23 +533,27 @@ def answers(self, other): bind_layers(GMLAN, GMLAN_SAPR, service=0x67) +GMLAN._service_cls[0x67] = GMLAN_SAPR # ########################DDM################################### class GMLAN_DDM(Packet): name = 'DynamicallyDefineMessage' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x2c, GMLAN.services), _gmlan_slm), XByteField('DPIDIdentifier', 0), StrField('PIDData', b'\x00\x00') ] -bind_layers(GMLAN, GMLAN_DDM, service=0x2C) +bind_layers(GMLAN, GMLAN_DDM, service=0x2c) +GMLAN._service_cls[0x2c] = GMLAN_DDM class GMLAN_DDMPR(Packet): name = 'DynamicallyDefineMessagePositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x6c, GMLAN.services), _gmlan_slm), XByteField('DPIDIdentifier', 0) ] @@ -491,13 +562,15 @@ def answers(self, other): and other.DPIDIdentifier == self.DPIDIdentifier -bind_layers(GMLAN, GMLAN_DDMPR, service=0x6C) +bind_layers(GMLAN, GMLAN_DDMPR, service=0x6c) +GMLAN._service_cls[0x6c] = GMLAN_DDMPR # ########################DPBA################################### class GMLAN_DPBA(Packet): name = 'DefinePIDByAddress' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x2d, GMLAN.services), _gmlan_slm), XShortField('parameterIdentifier', 0), MultipleTypeField( [ @@ -513,12 +586,14 @@ class GMLAN_DPBA(Packet): ] -bind_layers(GMLAN, GMLAN_DPBA, service=0x2D) +bind_layers(GMLAN, GMLAN_DPBA, service=0x2d) +GMLAN._service_cls[0x2d] = GMLAN_DPBA class GMLAN_DPBAPR(Packet): name = 'DefinePIDByAddressPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x6d, GMLAN.services), _gmlan_slm), XShortField('parameterIdentifier', 0), ] @@ -527,13 +602,15 @@ def answers(self, other): and other.parameterIdentifier == self.parameterIdentifier -bind_layers(GMLAN, GMLAN_DPBAPR, service=0x6D) +bind_layers(GMLAN, GMLAN_DPBAPR, service=0x6d) +GMLAN._service_cls[0x6d] = GMLAN_DPBAPR # ########################RD################################### class GMLAN_RD(Packet): name = 'RequestDownload' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x34, GMLAN.services), _gmlan_slm), XByteField('dataFormatIdentifier', 0), MultipleTypeField( [ @@ -549,6 +626,7 @@ class GMLAN_RD(Packet): bind_layers(GMLAN, GMLAN_RD, service=0x34) +GMLAN._service_cls[0x34] = GMLAN_RD # ########################TD################################### @@ -559,6 +637,7 @@ class GMLAN_TD(Packet): } name = 'TransferData' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x36, GMLAN.services), _gmlan_slm), ByteEnumField('subfunction', 0, subfunctions), MultipleTypeField( [ @@ -575,23 +654,27 @@ class GMLAN_TD(Packet): bind_layers(GMLAN, GMLAN_TD, service=0x36) +GMLAN._service_cls[0x36] = GMLAN_TD # ########################WDBI################################### class GMLAN_WDBI(Packet): name = 'WriteDataByIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x3b, GMLAN.services), _gmlan_slm), XByteEnumField('dataIdentifier', 0, GMLAN_RDBI.dataIdentifiers), StrField("dataRecord", b'') ] -bind_layers(GMLAN, GMLAN_WDBI, service=0x3B) +bind_layers(GMLAN, GMLAN_WDBI, service=0x3b) +GMLAN._service_cls[0x3b] = GMLAN_WDBI class GMLAN_WDBIPR(Packet): name = 'WriteDataByIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7b, GMLAN.services), _gmlan_slm), XByteEnumField('dataIdentifier', 0, GMLAN_RDBI.dataIdentifiers) ] @@ -600,7 +683,8 @@ def answers(self, other): and other.dataIdentifier == self.dataIdentifier -bind_layers(GMLAN, GMLAN_WDBIPR, service=0x7B) +bind_layers(GMLAN, GMLAN_WDBIPR, service=0x7b) +GMLAN._service_cls[0x7b] = GMLAN_WDBIPR # ########################RPSPR################################### @@ -619,11 +703,13 @@ class GMLAN_RPSPR(Packet): } name = 'ReportProgrammedStatePositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xe2, GMLAN.services), _gmlan_slm), ByteEnumField('programmedState', 0, programmedStates), ] -bind_layers(GMLAN, GMLAN_RPSPR, service=0xE2) +bind_layers(GMLAN, GMLAN_RPSPR, service=0xe2) +GMLAN._service_cls[0xe2] = GMLAN_RPSPR # ########################PM################################### @@ -635,11 +721,13 @@ class GMLAN_PM(Packet): } name = 'ProgrammingMode' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xa5, GMLAN.services), _gmlan_slm), ByteEnumField('subfunction', 0, subfunctions), ] -bind_layers(GMLAN, GMLAN_PM, service=0xA5) +bind_layers(GMLAN, GMLAN_PM, service=0xa5) +GMLAN._service_cls[0xa5] = GMLAN_PM # ########################RDI################################### @@ -651,11 +739,13 @@ class GMLAN_RDI(Packet): } name = 'ReadDiagnosticInformation' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xa9, GMLAN.services), _gmlan_slm), ByteEnumField('subfunction', 0, subfunctions) ] -bind_layers(GMLAN, GMLAN_RDI, service=0xA9) +bind_layers(GMLAN, GMLAN_RDI, service=0xa9) +GMLAN._service_cls[0xa9] = GMLAN_RDI class GMLAN_RDI_BN(Packet): @@ -697,17 +787,20 @@ class GMLAN_RDI_BC(Packet): class GMLAN_DC(Packet): name = 'DeviceControl' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xae, GMLAN.services), _gmlan_slm), XByteField('CPIDNumber', 0), StrFixedLenField('CPIDControlBytes', b"", 5) ] -bind_layers(GMLAN, GMLAN_DC, service=0xAE) +bind_layers(GMLAN, GMLAN_DC, service=0xae) +GMLAN._service_cls[0xae] = GMLAN_DC class GMLAN_DCPR(Packet): name = 'DeviceControlPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xee, GMLAN.services), _gmlan_slm), XByteField('CPIDNumber', 0) ] @@ -716,7 +809,8 @@ def answers(self, other): and other.CPIDNumber == self.CPIDNumber -bind_layers(GMLAN, GMLAN_DCPR, service=0xEE) +bind_layers(GMLAN, GMLAN_DCPR, service=0xee) +GMLAN._service_cls[0xee] = GMLAN_DCPR # ########################NRC################################### @@ -739,6 +833,7 @@ class GMLAN_NR(Packet): } name = 'NegativeResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7f, GMLAN.services), _gmlan_slm), XByteEnumField('requestServiceId', 0, GMLAN.services), MayEnd(ByteEnumField('returnCode', 0, negativeResponseCodes)), # XXX Is this MayEnd correct? Why is the field below also 0xe3 ? @@ -752,3 +847,4 @@ def answers(self, other): bind_layers(GMLAN, GMLAN_NR, service=0x7f) +GMLAN._service_cls[0x7f] = GMLAN_NR diff --git a/scapy/contrib/automotive/kwp.py b/scapy/contrib/automotive/kwp.py index 5617c1d7a23..019319ee10f 100644 --- a/scapy/contrib/automotive/kwp.py +++ b/scapy/contrib/automotive/kwp.py @@ -21,16 +21,18 @@ XByteField, XShortEnumField, ) -from scapy.packet import Packet, bind_layers, NoPayload +from scapy.packet import Packet, NoPayload, bind_layers from scapy.config import conf from scapy.error import log_loading from scapy.utils import PeriodicSenderThread -from scapy.plist import _PacketIterable +from scapy.plist import _PacketIterable # noqa: F401 from scapy.contrib.isotp import ISOTP +from scapy.compat import orb -from typing import ( - Dict, +from typing import ( # noqa: F401 Any, + Dict, + Type, ) @@ -43,7 +45,34 @@ "a negative response 'requestCorrectlyReceived-" "ResponsePending' as answer of a request. \n" "The default value is False.") - conf.contribs['KWP'] = {'treat-response-pending-as-answer': False} + conf.contribs['KWP'] = {'treat-response-pending-as-answer': False, + 'single_layer_mode': False, + 'compatibility_mode': True} + + +def _kwp_slm(pkt): + # type: (Packet) -> bool + """Return True when the service ConditionalField should be present. + + Two configuration keys in ``conf.contribs['KWP']`` control the behaviour: + + ``single_layer_mode`` (bool, default ``False``): + When *True*, :class:`KWP` acts as a dispatch layer and returns the + matching service sub-packet directly. Each sub-packet gains its own + ``service`` field so that it can be built and dissected stand-alone. + + ``compatibility_mode`` (bool, default ``True``): + Only relevant when ``single_layer_mode`` is *True*. When *True* the + ``service`` field is **suppressed** in a sub-packet whose immediate + underlayer is already a :class:`KWP` packet, preventing a duplicate + service byte when sub-packets are stacked (``KWP()/KWP_SDS()``). + Set to *False* to always emit the ``service`` byte from the sub-packet. + """ + if not conf.contribs['KWP'].get('single_layer_mode', False): + return False + if conf.contribs['KWP'].get('compatibility_mode', True): + return pkt.underlayer is None or not isinstance(pkt.underlayer, KWP) + return True class KWP(ISOTP): @@ -113,13 +142,13 @@ def answers(self, other): if not isinstance(other, type(self)): return False if self.service == 0x7f: - return self.payload.answers(other) + return bool(self.payload.answers(other)) if self.service == (other.service + 0x40): if isinstance(self.payload, NoPayload) or \ isinstance(other.payload, NoPayload): return len(self) <= len(other) else: - return self.payload.answers(other.payload) + return bool(self.payload.answers(other.payload)) return False def hashret(self): @@ -129,6 +158,17 @@ def hashret(self): else: return struct.pack('B', self.service & ~0x40) + _service_cls = {} # type: Dict[int, Type[Packet]] + + @classmethod + def dispatch_hook(cls, _pkt=b"", *args, **kwargs): + # type: (bytes, Any, Any) -> type + """Dispatch to the correct KWP service class in single layer mode.""" + if conf.contribs['KWP'].get('single_layer_mode', False) and len(_pkt) >= 1: + service = orb(_pkt[0]) + return cls._service_cls.get(service, cls) + return cls + # ########################SDS################################### class KWP_SDS(Packet): @@ -140,16 +180,19 @@ class KWP_SDS(Packet): 0x92: 'extendedDiagnosticSession'}) name = 'StartDiagnosticSession' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x10, KWP.services), _kwp_slm), ByteEnumField('diagnosticSession', 0, diagnosticSessionTypes) ] bind_layers(KWP, KWP_SDS, service=0x10) +KWP._service_cls[0x10] = KWP_SDS class KWP_SDSPR(Packet): name = 'StartDiagnosticSessionPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x50, KWP.services), _kwp_slm), ByteEnumField('diagnosticSession', 0, KWP_SDS.diagnosticSessionTypes), ] @@ -161,6 +204,7 @@ def answers(self, other): bind_layers(KWP, KWP_SDSPR, service=0x50) +KWP._service_cls[0x50] = KWP_SDSPR # ######################### KWP_ER ################################### @@ -171,14 +215,19 @@ class KWP_ER(Packet): 0x82: 'nonvolatileMemoryReset'} name = 'ECUReset' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x11, KWP.services), _kwp_slm), ByteEnumField('resetMode', 0, resetModes) ] bind_layers(KWP, KWP_ER, service=0x11) +KWP._service_cls[0x11] = KWP_ER class KWP_ERPR(Packet): + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x51, KWP.services), _kwp_slm), + ] name = 'ECUResetPositiveResponse' def answers(self, other): @@ -187,12 +236,14 @@ def answers(self, other): bind_layers(KWP, KWP_ERPR, service=0x51) +KWP._service_cls[0x51] = KWP_ERPR # ######################### KWP_SA ################################### class KWP_SA(Packet): name = 'SecurityAccess' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x27, KWP.services), _kwp_slm), ByteField('accessMode', 0), ConditionalField(StrField('key', b""), lambda pkt: pkt.accessMode % 2 == 0) @@ -200,11 +251,13 @@ class KWP_SA(Packet): bind_layers(KWP, KWP_SA, service=0x27) +KWP._service_cls[0x27] = KWP_SA class KWP_SAPR(Packet): name = 'SecurityAccessPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x67, KWP.services), _kwp_slm), ByteField('accessMode', 0), ConditionalField(StrField('seed', b""), lambda pkt: pkt.accessMode % 2 == 1), @@ -217,6 +270,7 @@ def answers(self, other): bind_layers(KWP, KWP_SAPR, service=0x67) +KWP._service_cls[0x67] = KWP_SAPR # ######################### KWP_IOCBLI ################################### @@ -231,6 +285,7 @@ class KWP_IOCBLI(Packet): 0x08: "Long Term Adjustment" } fields_desc = [ + ConditionalField(XByteEnumField('service', 0x30, KWP.services), _kwp_slm), XByteField('localIdentifier', 0), XByteEnumField('inputOutputControlParameter', 0, inputOutputControlParameters), @@ -239,11 +294,13 @@ class KWP_IOCBLI(Packet): bind_layers(KWP, KWP_IOCBLI, service=0x30) +KWP._service_cls[0x30] = KWP_IOCBLI class KWP_IOCBLIPR(Packet): name = 'InputOutputControlByLocalIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x70, KWP.services), _kwp_slm), XByteField('localIdentifier', 0), XByteEnumField('inputOutputControlParameter', 0, KWP_IOCBLI.inputOutputControlParameters), @@ -257,6 +314,7 @@ def answers(self, other): bind_layers(KWP, KWP_IOCBLIPR, service=0x70) +KWP._service_cls[0x70] = KWP_IOCBLIPR # ######################### KWP_DNMT ################################### @@ -267,14 +325,19 @@ class KWP_DNMT(Packet): } name = 'DisableNormalMessageTransmission' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x28, KWP.services), _kwp_slm), ByteEnumField('responseRequired', 0, responseTypes) ] bind_layers(KWP, KWP_DNMT, service=0x28) +KWP._service_cls[0x28] = KWP_DNMT class KWP_DNMTPR(Packet): + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x68, KWP.services), _kwp_slm), + ] name = 'DisableNormalMessageTransmissionPositiveResponse' def answers(self, other): @@ -283,6 +346,7 @@ def answers(self, other): bind_layers(KWP, KWP_DNMTPR, service=0x68) +KWP._service_cls[0x68] = KWP_DNMTPR # ######################### KWP_ENMT ################################### @@ -293,14 +357,19 @@ class KWP_ENMT(Packet): } name = 'EnableNormalMessageTransmission' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x29, KWP.services), _kwp_slm), ByteEnumField('responseRequired', 1, responseTypes) ] bind_layers(KWP, KWP_ENMT, service=0x29) +KWP._service_cls[0x29] = KWP_ENMT class KWP_ENMTPR(Packet): + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x69, KWP.services), _kwp_slm), + ] name = 'EnableNormalMessageTransmissionPositiveResponse' def answers(self, other): @@ -309,6 +378,7 @@ def answers(self, other): bind_layers(KWP, KWP_ENMTPR, service=0x69) +KWP._service_cls[0x69] = KWP_ENMTPR # ######################### KWP_TP ################################### @@ -319,14 +389,19 @@ class KWP_TP(Packet): } name = 'TesterPresent' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x3e, KWP.services), _kwp_slm), ByteEnumField('responseRequired', 1, responseTypes) ] -bind_layers(KWP, KWP_TP, service=0x3E) +bind_layers(KWP, KWP_TP, service=0x3e) +KWP._service_cls[0x3e] = KWP_TP class KWP_TPPR(Packet): + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7e, KWP.services), _kwp_slm), + ] name = 'TesterPresentPositiveResponse' def answers(self, other): @@ -334,7 +409,8 @@ def answers(self, other): return isinstance(other, KWP_TP) -bind_layers(KWP, KWP_TPPR, service=0x7E) +bind_layers(KWP, KWP_TPPR, service=0x7e) +KWP._service_cls[0x7e] = KWP_TPPR # ######################### KWP_CDTCS ################################### @@ -357,6 +433,7 @@ class KWP_CDTCS(Packet): } name = 'ControlDTCSetting' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x85, KWP.services), _kwp_slm), ByteEnumField('responseRequired', 1, responseTypes), XShortEnumField('groupOfDTC', 0, DTCGroups), ByteEnumField('DTCSettingMode', 0, DTCSettingModes), @@ -364,9 +441,13 @@ class KWP_CDTCS(Packet): bind_layers(KWP, KWP_CDTCS, service=0x85) +KWP._service_cls[0x85] = KWP_CDTCS class KWP_CDTCSPR(Packet): + fields_desc = [ + ConditionalField(XByteEnumField('service', 0xc5, KWP.services), _kwp_slm), + ] name = 'ControlDTCSettingPositiveResponse' def answers(self, other): @@ -374,7 +455,8 @@ def answers(self, other): return isinstance(other, KWP_CDTCS) -bind_layers(KWP, KWP_CDTCSPR, service=0xC5) +bind_layers(KWP, KWP_CDTCSPR, service=0xc5) +KWP._service_cls[0xc5] = KWP_CDTCSPR # ######################### KWP_ROE ################################### @@ -399,6 +481,7 @@ class KWP_ROE(Packet): } name = 'ResponseOnEvent' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x86, KWP.services), _kwp_slm), ByteEnumField('responseRequired', 1, responseTypes), ByteEnumField('eventWindowTime', 0, eventWindowTimes), MayEnd(ByteEnumField('eventType', 0, eventTypes)), @@ -410,11 +493,13 @@ class KWP_ROE(Packet): bind_layers(KWP, KWP_ROE, service=0x86) +KWP._service_cls[0x86] = KWP_ROE class KWP_ROEPR(Packet): name = 'ResponseOnEventPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xc6, KWP.services), _kwp_slm), ByteField("numberOfActivatedEvents", 0), MayEnd(ByteEnumField('eventWindowTime', 0, KWP_ROE.eventWindowTimes)), # XXX Is this MayEnd correct? @@ -427,7 +512,8 @@ def answers(self, other): and other.eventType == self.eventType -bind_layers(KWP, KWP_ROEPR, service=0xC6) +bind_layers(KWP, KWP_ROEPR, service=0xc6) +KWP._service_cls[0xc6] = KWP_ROEPR # ######################### KWP_RDBLI ################################### @@ -448,16 +534,19 @@ class KWP_RDBLI(Packet): }) name = 'ReadDataByLocalIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x21, KWP.services), _kwp_slm), XByteEnumField('recordLocalIdentifier', 0, localIdentifiers) ] bind_layers(KWP, KWP_RDBLI, service=0x21) +KWP._service_cls[0x21] = KWP_RDBLI class KWP_RDBLIPR(Packet): name = 'ReadDataByLocalIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x61, KWP.services), _kwp_slm), XByteEnumField('recordLocalIdentifier', 0, KWP_RDBLI.localIdentifiers) ] @@ -468,22 +557,26 @@ def answers(self, other): bind_layers(KWP, KWP_RDBLIPR, service=0x61) +KWP._service_cls[0x61] = KWP_RDBLIPR # ######################### KWP_WDBLI ################################### class KWP_WDBLI(Packet): name = 'WriteDataByLocalIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x3b, KWP.services), _kwp_slm), XByteEnumField('recordLocalIdentifier', 0, KWP_RDBLI.localIdentifiers) ] -bind_layers(KWP, KWP_WDBLI, service=0x3B) +bind_layers(KWP, KWP_WDBLI, service=0x3b) +KWP._service_cls[0x3b] = KWP_WDBLI class KWP_WDBLIPR(Packet): name = 'WriteDataByLocalIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7b, KWP.services), _kwp_slm), XByteEnumField('recordLocalIdentifier', 0, KWP_RDBLI.localIdentifiers) ] @@ -493,7 +586,8 @@ def answers(self, other): and self.recordLocalIdentifier == other.recordLocalIdentifier -bind_layers(KWP, KWP_WDBLIPR, service=0x7B) +bind_layers(KWP, KWP_WDBLIPR, service=0x7b) +KWP._service_cls[0x7b] = KWP_WDBLIPR # ######################### KWP_RDBI ################################### @@ -501,16 +595,19 @@ class KWP_RDBI(Packet): dataIdentifiers = ObservableDict() name = 'ReadDataByIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x22, KWP.services), _kwp_slm), XShortEnumField('identifier', 0, dataIdentifiers) ] bind_layers(KWP, KWP_RDBI, service=0x22) +KWP._service_cls[0x22] = KWP_RDBI class KWP_RDBIPR(Packet): name = 'ReadDataByIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x62, KWP.services), _kwp_slm), XShortEnumField('identifier', 0, KWP_RDBI.dataIdentifiers), ] @@ -521,23 +618,27 @@ def answers(self, other): bind_layers(KWP, KWP_RDBIPR, service=0x62) +KWP._service_cls[0x62] = KWP_RDBIPR # ######################### KWP_RMBA ################################### class KWP_RMBA(Packet): name = 'ReadMemoryByAddress' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x23, KWP.services), _kwp_slm), X3BytesField('memoryAddress', 0), ByteField('memorySize', 0) ] bind_layers(KWP, KWP_RMBA, service=0x23) +KWP._service_cls[0x23] = KWP_RMBA class KWP_RMBAPR(Packet): name = 'ReadMemoryByAddressPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x63, KWP.services), _kwp_slm), StrField('dataRecord', b"", fmt="B") ] @@ -547,6 +648,7 @@ def answers(self, other): bind_layers(KWP, KWP_RMBAPR, service=0x63) +KWP._service_cls[0x63] = KWP_RMBAPR # ######################### KWP_DDLI ################################### @@ -559,18 +661,21 @@ class KWP_DDLI(Packet): 0x3: "defineByIdentifier", 0x4: "clearDynamicallyDefinedLocalIdentifier"} fields_desc = [ + ConditionalField(XByteEnumField('service', 0x2c, KWP.services), _kwp_slm), XByteField('dynamicallyDefineLocalIdentifier', 0), ByteEnumField('definitionMode', 0, definitionModes), StrField('dataRecord', b"", fmt="B") ] -bind_layers(KWP, KWP_DDLI, service=0x2C) +bind_layers(KWP, KWP_DDLI, service=0x2c) +KWP._service_cls[0x2c] = KWP_DDLI class KWP_DDLIPR(Packet): name = 'DynamicallyDefineLocalIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x6c, KWP.services), _kwp_slm), XByteField('dynamicallyDefineLocalIdentifier', 0) ] @@ -580,23 +685,27 @@ def answers(self, other): other.dynamicallyDefineLocalIdentifier == self.dynamicallyDefineLocalIdentifier # noqa: E501 -bind_layers(KWP, KWP_DDLIPR, service=0x6C) +bind_layers(KWP, KWP_DDLIPR, service=0x6c) +KWP._service_cls[0x6c] = KWP_DDLIPR # ######################### KWP_WDBI ################################### class KWP_WDBI(Packet): name = 'WriteDataByIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x2e, KWP.services), _kwp_slm), XShortEnumField('identifier', 0, KWP_RDBI.dataIdentifiers) ] -bind_layers(KWP, KWP_WDBI, service=0x2E) +bind_layers(KWP, KWP_WDBI, service=0x2e) +KWP._service_cls[0x2e] = KWP_WDBI class KWP_WDBIPR(Packet): name = 'WriteDataByIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x6e, KWP.services), _kwp_slm), XShortEnumField('identifier', 0, KWP_RDBI.dataIdentifiers), ] @@ -606,25 +715,29 @@ def answers(self, other): and other.identifier == self.identifier -bind_layers(KWP, KWP_WDBIPR, service=0x6E) +bind_layers(KWP, KWP_WDBIPR, service=0x6e) +KWP._service_cls[0x6e] = KWP_WDBIPR # ######################### KWP_WMBA ################################### class KWP_WMBA(Packet): name = 'WriteMemoryByAddress' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x3d, KWP.services), _kwp_slm), X3BytesField('memoryAddress', 0), ByteField('memorySize', 0), StrField('dataRecord', b'', fmt="B") ] -bind_layers(KWP, KWP_WMBA, service=0x3D) +bind_layers(KWP, KWP_WMBA, service=0x3d) +KWP._service_cls[0x3d] = KWP_WMBA class KWP_WMBAPR(Packet): name = 'WriteMemoryByAddressPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7d, KWP.services), _kwp_slm), X3BytesField('memoryAddress', 0) ] @@ -634,7 +747,8 @@ def answers(self, other): other.memoryAddress == self.memoryAddress -bind_layers(KWP, KWP_WMBAPR, service=0x7D) +bind_layers(KWP, KWP_WMBAPR, service=0x7d) +KWP._service_cls[0x7d] = KWP_WMBAPR # ######################### KWP_CDI ################################### @@ -648,17 +762,20 @@ class KWP_CDI(Packet): } name = 'ClearDiagnosticInformation' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x14, KWP.services), _kwp_slm), XShortEnumField('groupOfDTC', 0, DTCGroups) ] bind_layers(KWP, KWP_CDI, service=0x14) +KWP._service_cls[0x14] = KWP_CDI class KWP_CDIPR(Packet): name = 'ClearDiagnosticInformationPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x54, KWP.services), _kwp_slm), XShortEnumField('groupOfDTC', 0, KWP_CDI.DTCGroups) ] @@ -669,23 +786,27 @@ def answers(self, other): bind_layers(KWP, KWP_CDIPR, service=0x54) +KWP._service_cls[0x54] = KWP_CDIPR # ######################### KWP_RSODTC ################################### class KWP_RSODTC(Packet): name = 'ReadStatusOfDiagnosticTroubleCodes' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x17, KWP.services), _kwp_slm), XShortEnumField('groupOfDTC', 0, KWP_CDI.DTCGroups) ] bind_layers(KWP, KWP_RSODTC, service=0x17) +KWP._service_cls[0x17] = KWP_RSODTC class KWP_RSODTCPR(Packet): name = 'ReadStatusOfDiagnosticTroubleCodesPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x57, KWP.services), _kwp_slm), ByteField('numberOfDTC', 0), ] @@ -695,6 +816,7 @@ def answers(self, other): bind_layers(KWP, KWP_RSODTCPR, service=0x57) +KWP._service_cls[0x57] = KWP_RSODTCPR # ######################### KWP_RECUI ################################### @@ -716,17 +838,20 @@ class KWP_RECUI(Packet): 0x9F: "ECU Boot Fingerprint" }) fields_desc = [ + ConditionalField(XByteEnumField('service', 0x1a, KWP.services), _kwp_slm), XByteEnumField('localIdentifier', 0, localIdentifiers) ] -bind_layers(KWP, KWP_RECUI, service=0x1A) +bind_layers(KWP, KWP_RECUI, service=0x1a) +KWP._service_cls[0x1a] = KWP_RECUI class KWP_RECUIPR(Packet): name = 'ReadECUIdentificationPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x5a, KWP.services), _kwp_slm), XByteEnumField('localIdentifier', 0, KWP_RECUI.localIdentifiers) ] @@ -736,7 +861,8 @@ def answers(self, other): self.localIdentifier == other.localIdentifier -bind_layers(KWP, KWP_RECUIPR, service=0x5A) +bind_layers(KWP, KWP_RECUIPR, service=0x5a) +KWP._service_cls[0x5a] = KWP_RECUIPR # ######################### KWP_SRBLI ################################### @@ -755,16 +881,19 @@ class KWP_SRBLI(Packet): }) name = 'StartRoutineByLocalIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x31, KWP.services), _kwp_slm), XByteEnumField('routineLocalIdentifier', 0, routineLocalIdentifiers) ] bind_layers(KWP, KWP_SRBLI, service=0x31) +KWP._service_cls[0x31] = KWP_SRBLI class KWP_SRBLIPR(Packet): name = 'StartRoutineByLocalIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x71, KWP.services), _kwp_slm), XByteEnumField('routineLocalIdentifier', 0, KWP_SRBLI.routineLocalIdentifiers) ] @@ -776,23 +905,27 @@ def answers(self, other): bind_layers(KWP, KWP_SRBLIPR, service=0x71) +KWP._service_cls[0x71] = KWP_SRBLIPR # ######################### KWP_STRBLI ################################### class KWP_STRBLI(Packet): name = 'StopRoutineByLocalIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x32, KWP.services), _kwp_slm), XByteEnumField('routineLocalIdentifier', 0, KWP_SRBLI.routineLocalIdentifiers) ] bind_layers(KWP, KWP_STRBLI, service=0x32) +KWP._service_cls[0x32] = KWP_STRBLI class KWP_STRBLIPR(Packet): name = 'StopRoutineByLocalIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x72, KWP.services), _kwp_slm), XByteEnumField('routineLocalIdentifier', 0, KWP_SRBLI.routineLocalIdentifiers) ] @@ -804,23 +937,27 @@ def answers(self, other): bind_layers(KWP, KWP_STRBLIPR, service=0x72) +KWP._service_cls[0x72] = KWP_STRBLIPR # ######################### KWP_RRRBLI ################################### class KWP_RRRBLI(Packet): name = 'RequestRoutineResultsByLocalIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x33, KWP.services), _kwp_slm), XByteEnumField('routineLocalIdentifier', 0, KWP_SRBLI.routineLocalIdentifiers) ] bind_layers(KWP, KWP_RRRBLI, service=0x33) +KWP._service_cls[0x33] = KWP_RRRBLI class KWP_RRRBLIPR(Packet): name = 'RequestRoutineResultsByLocalIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x73, KWP.services), _kwp_slm), XByteEnumField('routineLocalIdentifier', 0, KWP_SRBLI.routineLocalIdentifiers) ] @@ -832,12 +969,14 @@ def answers(self, other): bind_layers(KWP, KWP_RRRBLIPR, service=0x73) +KWP._service_cls[0x73] = KWP_RRRBLIPR # ######################### KWP_RD ################################### class KWP_RD(Packet): name = 'RequestDownload' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x34, KWP.services), _kwp_slm), X3BytesField('memoryAddress', 0), BitField('compression', 0, 4), BitField('encryption', 0, 4), @@ -846,11 +985,13 @@ class KWP_RD(Packet): bind_layers(KWP, KWP_RD, service=0x34) +KWP._service_cls[0x34] = KWP_RD class KWP_RDPR(Packet): name = 'RequestDownloadPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x74, KWP.services), _kwp_slm), StrField('maxNumberOfBlockLength', b"", fmt="B"), ] @@ -860,12 +1001,14 @@ def answers(self, other): bind_layers(KWP, KWP_RDPR, service=0x74) +KWP._service_cls[0x74] = KWP_RDPR # ######################### KWP_RU ################################### class KWP_RU(Packet): name = 'RequestUpload' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x35, KWP.services), _kwp_slm), X3BytesField('memoryAddress', 0), BitField('compression', 0, 4), BitField('encryption', 0, 4), @@ -874,11 +1017,13 @@ class KWP_RU(Packet): bind_layers(KWP, KWP_RU, service=0x35) +KWP._service_cls[0x35] = KWP_RU class KWP_RUPR(Packet): name = 'RequestUploadPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x75, KWP.services), _kwp_slm), StrField('maxNumberOfBlockLength', b"", fmt="B"), ] @@ -888,23 +1033,27 @@ def answers(self, other): bind_layers(KWP, KWP_RUPR, service=0x75) +KWP._service_cls[0x75] = KWP_RUPR # ######################### KWP_TD ################################### class KWP_TD(Packet): name = 'TransferData' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x36, KWP.services), _kwp_slm), ByteField('blockSequenceCounter', 0), StrField('transferDataRequestParameter', b"", fmt="B") ] bind_layers(KWP, KWP_TD, service=0x36) +KWP._service_cls[0x36] = KWP_TD class KWP_TDPR(Packet): name = 'TransferDataPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x76, KWP.services), _kwp_slm), ByteField('blockSequenceCounter', 0), StrField('transferDataRequestParameter', b"", fmt="B") ] @@ -916,22 +1065,26 @@ def answers(self, other): bind_layers(KWP, KWP_TDPR, service=0x76) +KWP._service_cls[0x76] = KWP_TDPR # ######################### KWP_RTE ################################### class KWP_RTE(Packet): name = 'RequestTransferExit' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x37, KWP.services), _kwp_slm), StrField('transferDataRequestParameter', b"", fmt="B") ] bind_layers(KWP, KWP_RTE, service=0x37) +KWP._service_cls[0x37] = KWP_RTE class KWP_RTEPR(Packet): name = 'RequestTransferExitPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x77, KWP.services), _kwp_slm), StrField('transferDataRequestParameter', b"", fmt="B") ] @@ -941,6 +1094,7 @@ def answers(self, other): bind_layers(KWP, KWP_RTEPR, service=0x77) +KWP._service_cls[0x77] = KWP_RTEPR # ######################### KWP_NR ################################### @@ -970,6 +1124,7 @@ class KWP_NR(Packet): } name = 'NegativeResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7f, KWP.services), _kwp_slm), MayEnd(XByteEnumField('requestServiceId', 0, KWP.services)), # XXX Is this MayEnd correct? ByteEnumField('negativeResponseCode', 0, negativeResponseCodes) @@ -983,6 +1138,7 @@ def answers(self, other): bind_layers(KWP, KWP_NR, service=0x7f) +KWP._service_cls[0x7f] = KWP_NR # ################################################################## diff --git a/scapy/contrib/automotive/obd/iid/iids.py b/scapy/contrib/automotive/obd/iid/iids.py index 908f5b66d42..5c85c2c4834 100644 --- a/scapy/contrib/automotive/obd/iid/iids.py +++ b/scapy/contrib/automotive/obd/iid/iids.py @@ -6,11 +6,14 @@ # scapy.contrib.status = skip -from scapy.fields import FieldLenField, FieldListField, StrFixedLenField, \ - ByteField, ShortField, FlagsField, XByteField, PacketListField +from scapy.fields import ( + ConditionalField, FieldLenField, FieldListField, StrFixedLenField, + ByteField, ShortField, FlagsField, XByteEnumField, XByteField, + PacketListField +) from scapy.packet import Packet, bind_layers from scapy.contrib.automotive.obd.packet import OBD_Packet -from scapy.contrib.automotive.obd.services import OBD_S09 +from scapy.contrib.automotive.obd.services import OBD_S09, _OBD_SERVICES, _obd_slm # See https://en.wikipedia.org/wiki/OBD-II_PIDs#Service_09 @@ -26,6 +29,7 @@ class OBD_S09_PR_Record(Packet): class OBD_S09_PR(Packet): name = "Infotype IDs" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x49, _OBD_SERVICES), _obd_slm), PacketListField("data_records", [], OBD_S09_PR_Record) ] diff --git a/scapy/contrib/automotive/obd/mid/mids.py b/scapy/contrib/automotive/obd/mid/mids.py index 8aa6b7b5624..0acd4a4df1c 100644 --- a/scapy/contrib/automotive/obd/mid/mids.py +++ b/scapy/contrib/automotive/obd/mid/mids.py @@ -6,11 +6,14 @@ # scapy.contrib.status = skip -from scapy.fields import FlagsField, ScalingField, ByteEnumField, \ - MultipleTypeField, ShortField, ShortEnumField, PacketListField +from scapy.fields import ( + ConditionalField, FlagsField, ScalingField, ByteEnumField, + XByteEnumField, MultipleTypeField, ShortField, ShortEnumField, + PacketListField +) from scapy.packet import Packet, bind_layers from scapy.contrib.automotive.obd.packet import OBD_Packet -from scapy.contrib.automotive.obd.services import OBD_S06 +from scapy.contrib.automotive.obd.services import OBD_S06, _OBD_SERVICES, _obd_slm def _unit_and_scaling_fields(name): @@ -457,6 +460,7 @@ class OBD_S06_PR_Record(Packet): class OBD_S06_PR(Packet): name = "On-Board monitoring IDs" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x46, _OBD_SERVICES), _obd_slm), PacketListField("data_records", [], OBD_S06_PR_Record) ] diff --git a/scapy/contrib/automotive/obd/obd.py b/scapy/contrib/automotive/obd/obd.py index a165935d2c3..005264f3559 100644 --- a/scapy/contrib/automotive/obd/obd.py +++ b/scapy/contrib/automotive/obd/obd.py @@ -14,10 +14,17 @@ from scapy.contrib.automotive.obd.pid.pids import * from scapy.contrib.automotive.obd.tid.tids import * from scapy.contrib.automotive.obd.services import * -from scapy.packet import bind_layers, NoPayload +from scapy.contrib.automotive.obd.services import _OBD_SERVICES +from scapy.packet import NoPayload, bind_layers from scapy.config import conf from scapy.fields import XByteEnumField from scapy.contrib.isotp import ISOTP +from scapy.compat import orb + +from typing import ( # noqa: F401 + Dict, + Type, +) try: if conf.contribs['OBD']['treat-response-pending-as-answer']: @@ -28,32 +35,13 @@ # "a negative response 'requestCorrectlyReceived-" # "ResponsePending' as answer of a request. \n" # "The default value is False.") - conf.contribs['OBD'] = {'treat-response-pending-as-answer': False} + conf.contribs['OBD'] = {'treat-response-pending-as-answer': False, + 'single_layer_mode': False, + 'compatibility_mode': True} class OBD(ISOTP): - services = { - 0x01: 'CurrentPowertrainDiagnosticDataRequest', - 0x02: 'PowertrainFreezeFrameDataRequest', - 0x03: 'EmissionRelatedDiagnosticTroubleCodesRequest', - 0x04: 'ClearResetDiagnosticTroubleCodesRequest', - 0x05: 'OxygenSensorMonitoringTestResultsRequest', - 0x06: 'OnBoardMonitoringTestResultsRequest', - 0x07: 'PendingEmissionRelatedDiagnosticTroubleCodesRequest', - 0x08: 'ControlOperationRequest', - 0x09: 'VehicleInformationRequest', - 0x0A: 'PermanentDiagnosticTroubleCodesRequest', - 0x41: 'CurrentPowertrainDiagnosticDataResponse', - 0x42: 'PowertrainFreezeFrameDataResponse', - 0x43: 'EmissionRelatedDiagnosticTroubleCodesResponse', - 0x44: 'ClearResetDiagnosticTroubleCodesResponse', - 0x45: 'OxygenSensorMonitoringTestResultsResponse', - 0x46: 'OnBoardMonitoringTestResultsResponse', - 0x47: 'PendingEmissionRelatedDiagnosticTroubleCodesResponse', - 0x48: 'ControlOperationResponse', - 0x49: 'VehicleInformationResponse', - 0x4A: 'PermanentDiagnosticTroubleCodesResponse', - 0x7f: 'NegativeResponse'} + services = _OBD_SERVICES name = "On-board diagnostics" @@ -79,26 +67,71 @@ def answers(self, other): return self.payload.answers(other.payload) return False + _service_cls = {} # type: Dict[int, Type[Packet]] + + @classmethod + def dispatch_hook(cls, _pkt=b"", *args, **kwargs): + # type: (...) -> type + """Dispatch to the correct OBD service class in single layer mode.""" + if conf.contribs['OBD'].get('single_layer_mode', False) and len(_pkt) >= 1: + service = orb(_pkt[0]) + return cls._service_cls.get(service, cls) + return cls -# Service Bindings bind_layers(OBD, OBD_S01, service=0x01) +OBD._service_cls[0x01] = OBD_S01 + bind_layers(OBD, OBD_S02, service=0x02) +OBD._service_cls[0x02] = OBD_S02 + bind_layers(OBD, OBD_S03, service=0x03) +OBD._service_cls[0x03] = OBD_S03 + bind_layers(OBD, OBD_S04, service=0x04) +OBD._service_cls[0x04] = OBD_S04 + bind_layers(OBD, OBD_S06, service=0x06) +OBD._service_cls[0x06] = OBD_S06 + bind_layers(OBD, OBD_S07, service=0x07) +OBD._service_cls[0x07] = OBD_S07 + bind_layers(OBD, OBD_S08, service=0x08) +OBD._service_cls[0x08] = OBD_S08 + bind_layers(OBD, OBD_S09, service=0x09) +OBD._service_cls[0x09] = OBD_S09 + bind_layers(OBD, OBD_S0A, service=0x0A) +OBD._service_cls[0x0A] = OBD_S0A bind_layers(OBD, OBD_S01_PR, service=0x41) +OBD._service_cls[0x41] = OBD_S01_PR + bind_layers(OBD, OBD_S02_PR, service=0x42) +OBD._service_cls[0x42] = OBD_S02_PR + bind_layers(OBD, OBD_S03_PR, service=0x43) +OBD._service_cls[0x43] = OBD_S03_PR + bind_layers(OBD, OBD_S04_PR, service=0x44) +OBD._service_cls[0x44] = OBD_S04_PR + bind_layers(OBD, OBD_S06_PR, service=0x46) +OBD._service_cls[0x46] = OBD_S06_PR + bind_layers(OBD, OBD_S07_PR, service=0x47) +OBD._service_cls[0x47] = OBD_S07_PR + bind_layers(OBD, OBD_S08_PR, service=0x48) +OBD._service_cls[0x48] = OBD_S08_PR + bind_layers(OBD, OBD_S09_PR, service=0x49) +OBD._service_cls[0x49] = OBD_S09_PR + bind_layers(OBD, OBD_S0A_PR, service=0x4A) +OBD._service_cls[0x4A] = OBD_S0A_PR + bind_layers(OBD, OBD_NR, service=0x7F) +OBD._service_cls[0x7F] = OBD_NR diff --git a/scapy/contrib/automotive/obd/pid/pids.py b/scapy/contrib/automotive/obd/pid/pids.py index 6367ef1caa5..9ae039a52e1 100644 --- a/scapy/contrib/automotive/obd/pid/pids.py +++ b/scapy/contrib/automotive/obd/pid/pids.py @@ -7,9 +7,11 @@ # scapy.contrib.status = skip from scapy.packet import Packet, bind_layers -from scapy.fields import PacketListField +from scapy.fields import ConditionalField, PacketListField, XByteEnumField -from scapy.contrib.automotive.obd.services import OBD_S01, OBD_S02 +from scapy.contrib.automotive.obd.services import ( + OBD_S01, OBD_S02, _OBD_SERVICES, _obd_slm +) from scapy.contrib.automotive.obd.pid.pids_00_1F import * from scapy.contrib.automotive.obd.pid.pids_20_3F import * from scapy.contrib.automotive.obd.pid.pids_40_5F import * @@ -27,6 +29,7 @@ class OBD_S01_PR_Record(Packet): class OBD_S01_PR(Packet): name = "Parameter IDs" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x41, _OBD_SERVICES), _obd_slm), PacketListField("data_records", [], OBD_S01_PR_Record) ] @@ -45,6 +48,7 @@ class OBD_S02_PR_Record(Packet): class OBD_S02_PR(Packet): name = "Parameter IDs" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x42, _OBD_SERVICES), _obd_slm), PacketListField("data_records", [], OBD_S02_PR_Record) ] diff --git a/scapy/contrib/automotive/obd/pid/pids_00_1F.py b/scapy/contrib/automotive/obd/pid/pids_00_1F.py index 4cfc483021b..829aac6e5ec 100644 --- a/scapy/contrib/automotive/obd/pid/pids_00_1F.py +++ b/scapy/contrib/automotive/obd/pid/pids_00_1F.py @@ -19,7 +19,7 @@ class OBD_PID00(OBD_Packet): name = "PID_00_PIDsSupported" fields_desc = [ - FlagsField('supported_pids', b'', 32, [ + FlagsField('supported_pids', 0, 32, [ 'PID20', 'PID1F', 'PID1E', @@ -109,7 +109,7 @@ class OBD_PID01(OBD_Packet): class OBD_PID02(OBD_Packet): name = "PID_02_FreezeDtc" fields_desc = [ - PacketField('dtc', b'', OBD_DTC) + PacketField('dtc', None, OBD_DTC) ] @@ -250,7 +250,7 @@ class OBD_PID12(OBD_Packet): class OBD_PID13(OBD_Packet): name = "PID_13_OxygenSensorsPresent" fields_desc = [ - FlagsField('sensors_present', b'', 8, [ + FlagsField('sensors_present', 0, 8, [ 'Bank1Sensor1', 'Bank1Sensor2', 'Bank1Sensor3', diff --git a/scapy/contrib/automotive/obd/services.py b/scapy/contrib/automotive/obd/services.py index f00cf0dd519..57303bc13fb 100644 --- a/scapy/contrib/automotive/obd/services.py +++ b/scapy/contrib/automotive/obd/services.py @@ -7,11 +7,68 @@ # scapy.contrib.status = skip from scapy.fields import ByteField, XByteField, BitEnumField, \ - PacketListField, XBitField, XByteEnumField, FieldListField, FieldLenField + PacketListField, XBitField, XByteEnumField, FieldListField, \ + FieldLenField, ConditionalField from scapy.packet import Packet from scapy.contrib.automotive.obd.packet import OBD_Packet from scapy.config import conf +_OBD_SERVICES = { + 0x01: 'CurrentPowertrainDiagnosticDataRequest', + 0x02: 'PowertrainFreezeFrameDataRequest', + 0x03: 'EmissionRelatedDiagnosticTroubleCodesRequest', + 0x04: 'ClearResetDiagnosticTroubleCodesRequest', + 0x05: 'OxygenSensorMonitoringTestResultsRequest', + 0x06: 'OnBoardMonitoringTestResultsRequest', + 0x07: 'PendingEmissionRelatedDiagnosticTroubleCodesRequest', + 0x08: 'ControlOperationRequest', + 0x09: 'VehicleInformationRequest', + 0x0A: 'PermanentDiagnosticTroubleCodesRequest', + 0x41: 'CurrentPowertrainDiagnosticDataResponse', + 0x42: 'PowertrainFreezeFrameDataResponse', + 0x43: 'EmissionRelatedDiagnosticTroubleCodesResponse', + 0x44: 'ClearResetDiagnosticTroubleCodesResponse', + 0x45: 'OxygenSensorMonitoringTestResultsResponse', + 0x46: 'OnBoardMonitoringTestResultsResponse', + 0x47: 'PendingEmissionRelatedDiagnosticTroubleCodesResponse', + 0x48: 'ControlOperationResponse', + 0x49: 'VehicleInformationResponse', + 0x4A: 'PermanentDiagnosticTroubleCodesResponse', + 0x7f: 'NegativeResponse', +} + + +def _obd_slm(pkt): + # type: (Packet) -> bool + """Return True when the service ConditionalField should be present. + + Two configuration keys in ``conf.contribs['OBD']`` control the behaviour: + + ``single_layer_mode`` (bool, default ``False``): + When *True*, :class:`OBD` acts as a dispatch layer and returns the + matching service sub-packet directly. Each sub-packet gains its own + ``service`` field so that it can be built and dissected stand-alone. + + ``compatibility_mode`` (bool, default ``True``): + Only relevant when ``single_layer_mode`` is *True*. When *True* the + ``service`` field is **suppressed** in a sub-packet whose immediate + underlayer is already an :class:`OBD` packet, preventing a duplicate + service byte when sub-packets are stacked (``OBD()/OBD_S01()``). + Set to *False* to always emit the ``service`` byte from the sub-packet. + + .. note:: + OBD service classes live in ``services.py`` which is imported by + ``obd.py``. To avoid a circular import the underlayer class is + identified by its class name (``'OBD'``) rather than by an + ``isinstance`` check. + """ + if not conf.contribs['OBD'].get('single_layer_mode', False): + return False + if conf.contribs['OBD'].get('compatibility_mode', True): + ul = pkt.underlayer + return ul is None or type(ul).__name__ != 'OBD' + return True + class OBD_DTC(OBD_Packet): name = "DiagnosticTroubleCode" @@ -45,6 +102,7 @@ class OBD_NR(Packet): } fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7F, _OBD_SERVICES), _obd_slm), XByteField('request_service_id', 0), XByteEnumField('response_code', 0, responses) ] @@ -58,6 +116,7 @@ def answers(self, other): class OBD_S01(Packet): name = "S1_CurrentData" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x01, _OBD_SERVICES), _obd_slm), FieldListField("pid", [], XByteField('', 0)) ] @@ -72,17 +131,22 @@ class OBD_S02_Record(OBD_Packet): class OBD_S02(Packet): name = "S2_FreezeFrameData" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x02, _OBD_SERVICES), _obd_slm), PacketListField("requests", [], OBD_S02_Record) ] class OBD_S03(Packet): name = "S3_RequestDTCs" + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x03, _OBD_SERVICES), _obd_slm), + ] class OBD_S03_PR(Packet): name = "S3_ResponseDTCs" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x43, _OBD_SERVICES), _obd_slm), FieldLenField('count', None, count_of='dtcs', fmt='B'), PacketListField('dtcs', [], OBD_DTC, count_from=lambda pkt: pkt.count) ] @@ -93,10 +157,16 @@ def answers(self, other): class OBD_S04(Packet): name = "S4_ClearDTCs" + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x04, _OBD_SERVICES), _obd_slm), + ] class OBD_S04_PR(Packet): name = "S4_ClearDTCsPositiveResponse" + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x44, _OBD_SERVICES), _obd_slm), + ] def answers(self, other): return isinstance(other, OBD_S04) @@ -105,17 +175,22 @@ def answers(self, other): class OBD_S06(Packet): name = "S6_OnBoardDiagnosticMonitoring" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x06, _OBD_SERVICES), _obd_slm), FieldListField("mid", [], XByteField('', 0)) ] class OBD_S07(Packet): name = "S7_RequestPendingDTCs" + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x07, _OBD_SERVICES), _obd_slm), + ] class OBD_S07_PR(Packet): name = "S7_ResponsePendingDTCs" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x47, _OBD_SERVICES), _obd_slm), FieldLenField('count', None, count_of='dtcs', fmt='B'), PacketListField('dtcs', [], OBD_DTC, count_from=lambda pkt: pkt.count) ] @@ -127,6 +202,7 @@ def answers(self, other): class OBD_S08(Packet): name = "S8_RequestControlOfSystem" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x08, _OBD_SERVICES), _obd_slm), FieldListField("tid", [], XByteField('', 0)) ] @@ -134,17 +210,22 @@ class OBD_S08(Packet): class OBD_S09(Packet): name = "S9_VehicleInformation" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x09, _OBD_SERVICES), _obd_slm), FieldListField("iid", [], XByteField('', 0)) ] class OBD_S0A(Packet): name = "S0A_RequestPermanentDTCs" + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x0A, _OBD_SERVICES), _obd_slm), + ] class OBD_S0A_PR(Packet): name = "S0A_ResponsePermanentDTCs" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x4A, _OBD_SERVICES), _obd_slm), FieldLenField('count', None, count_of='dtcs', fmt='B'), PacketListField('dtcs', [], OBD_DTC, count_from=lambda pkt: pkt.count) ] diff --git a/scapy/contrib/automotive/obd/tid/tids.py b/scapy/contrib/automotive/obd/tid/tids.py index 27bda0df58a..cf92b7acd01 100644 --- a/scapy/contrib/automotive/obd/tid/tids.py +++ b/scapy/contrib/automotive/obd/tid/tids.py @@ -6,10 +6,13 @@ # scapy.contrib.status = skip -from scapy.fields import FlagsField, ByteField, ScalingField, PacketListField +from scapy.fields import ( + ConditionalField, FlagsField, ByteField, ScalingField, PacketListField, + XByteEnumField +) from scapy.packet import bind_layers, Packet from scapy.contrib.automotive.obd.packet import OBD_Packet -from scapy.contrib.automotive.obd.services import OBD_S08 +from scapy.contrib.automotive.obd.services import OBD_S08, _OBD_SERVICES, _obd_slm class _OBD_TID_Voltage(OBD_Packet): @@ -132,6 +135,7 @@ class OBD_S08_PR_Record(Packet): class OBD_S08_PR(Packet): name = "Control Operation IDs" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x48, _OBD_SERVICES), _obd_slm), PacketListField("data_records", [], OBD_S08_PR_Record) ] diff --git a/scapy/contrib/automotive/uds.py b/scapy/contrib/automotive/uds.py index 978bf6a0483..aedd4f5652b 100644 --- a/scapy/contrib/automotive/uds.py +++ b/scapy/contrib/automotive/uds.py @@ -19,14 +19,16 @@ ShortField, ObservableDict, XShortEnumField, XByteEnumField, StrLenField, \ FieldLenField, XStrFixedLenField, XStrLenField, FlagsField, PacketListField, \ PacketField -from scapy.packet import Packet, bind_layers, NoPayload, Raw +from scapy.packet import Packet, NoPayload, Raw, bind_layers +from scapy.compat import orb from scapy.config import conf from scapy.utils import PeriodicSenderThread from scapy.contrib.isotp import ISOTP # Typing imports -from typing import ( +from typing import ( # noqa: F401 Dict, + Type, Union, ) @@ -39,11 +41,41 @@ # "a negative response 'requestCorrectlyReceived-" # "ResponsePending' as answer of a request. \n" # "The default value is False.") - conf.contribs['UDS'] = {'treat-response-pending-as-answer': False} + conf.contribs['UDS'] = {'treat-response-pending-as-answer': False, + 'single_layer_mode': False, + 'compatibility_mode': True} conf.debug_dissector = True +def _uds_slm(pkt): + # type: (Packet) -> bool + """Return True when the service ConditionalField should be present. + + Two configuration keys in ``conf.contribs['UDS']`` control the behaviour: + + ``single_layer_mode`` (bool, default ``False``): + When *True*, :class:`UDS` acts as a dispatch layer and returns the + matching service sub-packet directly (e.g. + ``UDS(b'\\x10\\x01')`` → ``UDS_DSC``). Each sub-packet gains its + own ``service`` field so that it can be built and dissected + stand-alone. + + ``compatibility_mode`` (bool, default ``True``): + Only relevant when ``single_layer_mode`` is *True*. When *True* the + ``service`` field is **suppressed** in a sub-packet whose immediate + underlayer is already a :class:`UDS` packet. This prevents a + duplicate service byte when a sub-packet is stacked on top of a UDS + base layer (``UDS()/UDS_DSC()``). Set to *False* to always emit the + ``service`` byte from the sub-packet regardless of stacking. + """ + if not conf.contribs['UDS'].get('single_layer_mode', False): + return False + if conf.contribs['UDS'].get('compatibility_mode', True): + return pkt.underlayer is None or not isinstance(pkt.underlayer, UDS) + return True + + class UDS(ISOTP): services = ObservableDict( {0x10: 'DiagnosticSessionControl', @@ -111,13 +143,13 @@ def answers(self, other): if other.__class__ != self.__class__: return False if self.service == 0x7f: - return self.payload.answers(other) + return bool(self.payload.answers(other)) if self.service == (other.service + 0x40): if isinstance(self.payload, NoPayload) or \ isinstance(other.payload, NoPayload): return len(self) <= len(other) else: - return self.payload.answers(other.payload) + return bool(self.payload.answers(other.payload)) return False def hashret(self): @@ -126,6 +158,17 @@ def hashret(self): return struct.pack('B', bytes(self)[1] & ~0x40) return struct.pack('B', self.service & ~0x40) + _service_cls = {} # type: Dict[int, Type[Packet]] + + @classmethod + def dispatch_hook(cls, _pkt=b"", *args, **kwargs): + # type: (...) -> type + """Dispatch to the correct UDS service class in single layer mode.""" + if conf.contribs['UDS'].get('single_layer_mode', False) and len(_pkt) >= 1: + service = orb(_pkt[0]) + return cls._service_cls.get(service, cls) + return cls + # ########################DSC################################### class UDS_DSC(Packet): @@ -138,16 +181,19 @@ class UDS_DSC(Packet): 0x7F: 'ISOSAEReserved'}) name = 'DiagnosticSessionControl' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x10, UDS.services), _uds_slm), ByteEnumField('diagnosticSessionType', 0, diagnosticSessionTypes) ] bind_layers(UDS, UDS_DSC, service=0x10) +UDS._service_cls[0x10] = UDS_DSC class UDS_DSCPR(Packet): name = 'DiagnosticSessionControlPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x50, UDS.services), _uds_slm), ByteEnumField('diagnosticSessionType', 0, UDS_DSC.diagnosticSessionTypes), StrField('sessionParameterRecord', b"") @@ -159,6 +205,7 @@ def answers(self, other): bind_layers(UDS, UDS_DSCPR, service=0x50) +UDS._service_cls[0x50] = UDS_DSCPR # #########################ER################################### @@ -174,16 +221,19 @@ class UDS_ER(Packet): 0x7F: 'ISOSAEReserved'} name = 'ECUReset' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x11, UDS.services), _uds_slm), ByteEnumField('resetType', 0, resetTypes) ] bind_layers(UDS, UDS_ER, service=0x11) +UDS._service_cls[0x11] = UDS_ER class UDS_ERPR(Packet): name = 'ECUResetPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x51, UDS.services), _uds_slm), ByteEnumField('resetType', 0, UDS_ER.resetTypes), ConditionalField(ByteField('powerDownTime', 0), lambda pkt: pkt.resetType == 0x04) @@ -194,12 +244,14 @@ def answers(self, other): bind_layers(UDS, UDS_ERPR, service=0x51) +UDS._service_cls[0x51] = UDS_ERPR # #########################SA################################### class UDS_SA(Packet): name = 'SecurityAccess' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x27, UDS.services), _uds_slm), ByteField('securityAccessType', 0), ConditionalField(StrField('securityAccessDataRecord', b""), lambda pkt: pkt.securityAccessType % 2 == 1), @@ -209,11 +261,13 @@ class UDS_SA(Packet): bind_layers(UDS, UDS_SA, service=0x27) +UDS._service_cls[0x27] = UDS_SA class UDS_SAPR(Packet): name = 'SecurityAccessPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x67, UDS.services), _uds_slm), ByteField('securityAccessType', 0), ConditionalField(StrField('securitySeed', b""), lambda pkt: pkt.securityAccessType % 2 == 1), @@ -225,6 +279,7 @@ def answers(self, other): bind_layers(UDS, UDS_SAPR, service=0x67) +UDS._service_cls[0x67] = UDS_SAPR # #########################CC################################### @@ -237,6 +292,7 @@ class UDS_CC(Packet): } name = 'CommunicationControl' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x28, UDS.services), _uds_slm), ByteEnumField('controlType', 0, controlTypes), BitEnumField('communicationType0', 0, 2, {0: 'ISOSAEReserved', @@ -266,11 +322,13 @@ class UDS_CC(Packet): bind_layers(UDS, UDS_CC, service=0x28) +UDS._service_cls[0x28] = UDS_CC class UDS_CCPR(Packet): name = 'CommunicationControlPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x68, UDS.services), _uds_slm), ByteEnumField('controlType', 0, UDS_CC.controlTypes) ] @@ -280,6 +338,7 @@ def answers(self, other): bind_layers(UDS, UDS_CCPR, service=0x68) +UDS._service_cls[0x68] = UDS_CCPR # #########################AUTH################################### @@ -298,13 +357,15 @@ class UDS_AUTH(Packet): } name = "Authentication" fields_desc = [ + ConditionalField(XByteEnumField('service', 0x29, UDS.services), _uds_slm), ByteEnumField('subFunction', 0, subFunctions), ConditionalField(XByteField('communicationConfiguration', 0), lambda pkt: pkt.subFunction in [0x01, 0x02, 0x5]), ConditionalField(XShortField('certificateEvaluationId', 0), lambda pkt: pkt.subFunction == 0x04), - ConditionalField(XStrFixedLenField('algorithmIndicator', 0, length=16), - lambda pkt: pkt.subFunction in [0x05, 0x06, 0x07]), + ConditionalField( + XStrFixedLenField('algorithmIndicator', b'\x00' * 16, length=16), + lambda pkt: pkt.subFunction in [0x05, 0x06, 0x07]), ConditionalField(FieldLenField('lengthOfCertificateClient', None, fmt="H", length_of='certificateClient'), lambda pkt: pkt.subFunction in [0x01, 0x02]), @@ -356,6 +417,7 @@ class UDS_AUTH(Packet): bind_layers(UDS, UDS_AUTH, service=0x29) +UDS._service_cls[0x29] = UDS_AUTH class UDS_AUTHPR(Packet): @@ -379,10 +441,12 @@ class UDS_AUTHPR(Packet): } name = 'AuthenticationPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x69, UDS.services), _uds_slm), ByteEnumField('subFunction', 0, UDS_AUTH.subFunctions), ByteEnumField('returnValue', 0, authenticationReturnParameterTypes), - ConditionalField(XStrFixedLenField('algorithmIndicator', 0, length=16), - lambda pkt: pkt.subFunction in [0x05, 0x06, 0x07]), + ConditionalField( + XStrFixedLenField('algorithmIndicator', b'\x00' * 16, length=16), + lambda pkt: pkt.subFunction in [0x05, 0x06, 0x07]), ConditionalField(FieldLenField('lengthOfChallengeServer', None, fmt="H", length_of='challengeServer'), lambda pkt: pkt.subFunction in [0x01, 0x02, 0x05]), @@ -436,22 +500,26 @@ def answers(self, other): bind_layers(UDS, UDS_AUTHPR, service=0x69) +UDS._service_cls[0x69] = UDS_AUTHPR # #########################TP################################### class UDS_TP(Packet): name = 'TesterPresent' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x3e, UDS.services), _uds_slm), ByteField('subFunction', 0) ] -bind_layers(UDS, UDS_TP, service=0x3E) +bind_layers(UDS, UDS_TP, service=0x3e) +UDS._service_cls[0x3e] = UDS_TP class UDS_TPPR(Packet): name = 'TesterPresentPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7e, UDS.services), _uds_slm), ByteField('zeroSubFunction', 0) ] @@ -459,7 +527,8 @@ def answers(self, other): return isinstance(other, UDS_TP) -bind_layers(UDS, UDS_TPPR, service=0x7E) +bind_layers(UDS, UDS_TPPR, service=0x7e) +UDS._service_cls[0x7e] = UDS_TPPR # #########################ATP################################### @@ -473,6 +542,7 @@ class UDS_ATP(Packet): } name = 'AccessTimingParameter' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x83, UDS.services), _uds_slm), ByteEnumField('timingParameterAccessType', 0, timingParameterAccessTypes), ConditionalField(StrField('timingParameterRequestRecord', b""), @@ -481,11 +551,13 @@ class UDS_ATP(Packet): bind_layers(UDS, UDS_ATP, service=0x83) +UDS._service_cls[0x83] = UDS_ATP class UDS_ATPPR(Packet): name = 'AccessTimingParameterPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xc3, UDS.services), _uds_slm), ByteEnumField('timingParameterAccessType', 0, UDS_ATP.timingParameterAccessTypes), ConditionalField(StrField('timingParameterResponseRecord', b""), @@ -498,7 +570,8 @@ def answers(self, other): self.timingParameterAccessType -bind_layers(UDS, UDS_ATPPR, service=0xC3) +bind_layers(UDS, UDS_ATPPR, service=0xc3) +UDS._service_cls[0xc3] = UDS_ATPPR # #########################SDT################################### @@ -507,6 +580,7 @@ def answers(self, other): class UDS_SDT(Packet): name = 'SecuredDataTransmission' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x84, UDS.services), _uds_slm), BitField('requestMessage', 0, 1), BitField('ISOSAEReservedBackwardsCompatibility', 0, 2), BitField('preEstablishedKeyUsed', 0, 1), @@ -523,11 +597,13 @@ class UDS_SDT(Packet): bind_layers(UDS, UDS_SDT, service=0x84) +UDS._service_cls[0x84] = UDS_SDT class UDS_SDTPR(Packet): name = 'SecuredDataTransmissionPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xc4, UDS.services), _uds_slm), BitField('requestMessage', 0, 1), BitField('ISOSAEReservedBackwardsCompatibility', 0, 2), BitField('preEstablishedKeyUsed', 0, 1), @@ -546,7 +622,8 @@ def answers(self, other): return isinstance(other, UDS_SDT) -bind_layers(UDS, UDS_SDTPR, service=0xC4) +bind_layers(UDS, UDS_SDTPR, service=0xc4) +UDS._service_cls[0xc4] = UDS_SDTPR # #########################CDTCS################################### @@ -558,17 +635,20 @@ class UDS_CDTCS(Packet): } name = 'ControlDTCSetting' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x85, UDS.services), _uds_slm), ByteEnumField('DTCSettingType', 0, DTCSettingTypes), StrField('DTCSettingControlOptionRecord', b"") ] bind_layers(UDS, UDS_CDTCS, service=0x85) +UDS._service_cls[0x85] = UDS_CDTCS class UDS_CDTCSPR(Packet): name = 'ControlDTCSettingPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xc5, UDS.services), _uds_slm), ByteEnumField('DTCSettingType', 0, UDS_CDTCS.DTCSettingTypes) ] @@ -576,7 +656,8 @@ def answers(self, other): return isinstance(other, UDS_CDTCS) -bind_layers(UDS, UDS_CDTCSPR, service=0xC5) +bind_layers(UDS, UDS_CDTCSPR, service=0xc5) +UDS._service_cls[0xc5] = UDS_CDTCSPR # #########################ROE################################### @@ -588,6 +669,7 @@ class UDS_ROE(Packet): } name = 'ResponseOnEvent' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x86, UDS.services), _uds_slm), ByteEnumField('eventType', 0, eventTypes), ByteField('eventWindowTime', 0), StrField('eventTypeRecord', b"") @@ -595,11 +677,13 @@ class UDS_ROE(Packet): bind_layers(UDS, UDS_ROE, service=0x86) +UDS._service_cls[0x86] = UDS_ROE class UDS_ROEPR(Packet): name = 'ResponseOnEventPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xc6, UDS.services), _uds_slm), ByteEnumField('eventType', 0, UDS_ROE.eventTypes), ByteField('numberOfIdentifiedEvents', 0), ByteField('eventWindowTime', 0), @@ -611,7 +695,8 @@ def answers(self, other): and other.eventType == self.eventType -bind_layers(UDS, UDS_ROEPR, service=0xC6) +bind_layers(UDS, UDS_ROEPR, service=0xc6) +UDS._service_cls[0xc6] = UDS_ROEPR # #########################LC################################### @@ -624,6 +709,7 @@ class UDS_LC(Packet): } name = 'LinkControl' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x87, UDS.services), _uds_slm), ByteEnumField('linkControlType', 0, linkControlTypes), ConditionalField(ByteField('baudrateIdentifier', 0), lambda pkt: pkt.linkControlType == 0x1), @@ -637,11 +723,13 @@ class UDS_LC(Packet): bind_layers(UDS, UDS_LC, service=0x87) +UDS._service_cls[0x87] = UDS_LC class UDS_LCPR(Packet): name = 'LinkControlPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0xc7, UDS.services), _uds_slm), ByteEnumField('linkControlType', 0, UDS_LC.linkControlTypes) ] @@ -650,7 +738,8 @@ def answers(self, other): and other.linkControlType == self.linkControlType -bind_layers(UDS, UDS_LCPR, service=0xC7) +bind_layers(UDS, UDS_LCPR, service=0xc7) +UDS._service_cls[0xc7] = UDS_LCPR # #########################RDBI################################### @@ -658,6 +747,7 @@ class UDS_RDBI(Packet): dataIdentifiers = ObservableDict() name = 'ReadDataByIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x22, UDS.services), _uds_slm), FieldListField("identifiers", None, XShortEnumField('dataIdentifier', 0, dataIdentifiers)) @@ -665,11 +755,13 @@ class UDS_RDBI(Packet): bind_layers(UDS, UDS_RDBI, service=0x22) +UDS._service_cls[0x22] = UDS_RDBI class UDS_RDBIPR(Packet): name = 'ReadDataByIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x62, UDS.services), _uds_slm), XShortEnumField('dataIdentifier', 0, UDS_RDBI.dataIdentifiers), ] @@ -680,12 +772,14 @@ def answers(self, other): bind_layers(UDS, UDS_RDBIPR, service=0x62) +UDS._service_cls[0x62] = UDS_RDBIPR # #########################RMBA################################### class UDS_RMBA(Packet): name = 'ReadMemoryByAddress' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x23, UDS.services), _uds_slm), BitField('memorySizeLen', 0, 4), BitField('memoryAddressLen', 0, 4), ConditionalField(XByteField('memoryAddress1', 0), @@ -708,11 +802,13 @@ class UDS_RMBA(Packet): bind_layers(UDS, UDS_RMBA, service=0x23) +UDS._service_cls[0x23] = UDS_RMBA class UDS_RMBAPR(Packet): name = 'ReadMemoryByAddressPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x63, UDS.services), _uds_slm), StrField('dataRecord', b"", fmt="B") ] @@ -721,6 +817,7 @@ def answers(self, other): bind_layers(UDS, UDS_RMBAPR, service=0x63) +UDS._service_cls[0x63] = UDS_RMBAPR # #########################RSDBI################################### @@ -728,17 +825,20 @@ class UDS_RSDBI(Packet): name = 'ReadScalingDataByIdentifier' dataIdentifiers = ObservableDict() fields_desc = [ + ConditionalField(XByteEnumField('service', 0x24, UDS.services), _uds_slm), XShortEnumField('dataIdentifier', 0, dataIdentifiers) ] bind_layers(UDS, UDS_RSDBI, service=0x24) +UDS._service_cls[0x24] = UDS_RSDBI # TODO: Implement correct scaling here, instead of using just the dataRecord class UDS_RSDBIPR(Packet): name = 'ReadScalingDataByIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x64, UDS.services), _uds_slm), XShortEnumField('dataIdentifier', 0, UDS_RSDBI.dataIdentifiers), ByteField('scalingByte', 0), StrField('dataRecord', b"", fmt="B") @@ -750,6 +850,7 @@ def answers(self, other): bind_layers(UDS, UDS_RSDBIPR, service=0x64) +UDS._service_cls[0x64] = UDS_RSDBIPR # #########################RDBPI################################### @@ -764,19 +865,22 @@ class UDS_RDBPI(Packet): } name = 'ReadDataByPeriodicIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x2a, UDS.services), _uds_slm), ByteEnumField('transmissionMode', 0, transmissionModes), ByteEnumField('periodicDataIdentifier', 0, periodicDataIdentifiers), StrField('furtherPeriodicDataIdentifier', b"", fmt="B") ] -bind_layers(UDS, UDS_RDBPI, service=0x2A) +bind_layers(UDS, UDS_RDBPI, service=0x2a) +UDS._service_cls[0x2a] = UDS_RDBPI # TODO: Implement correct scaling here, instead of using just the dataRecord class UDS_RDBPIPR(Packet): name = 'ReadDataByPeriodicIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x6a, UDS.services), _uds_slm), ByteField('periodicDataIdentifier', 0), StrField('dataRecord', b"", fmt="B") ] @@ -786,7 +890,8 @@ def answers(self, other): and other.periodicDataIdentifier == self.periodicDataIdentifier -bind_layers(UDS, UDS_RDBPIPR, service=0x6A) +bind_layers(UDS, UDS_RDBPIPR, service=0x6a) +UDS._service_cls[0x6a] = UDS_RDBPIPR # #########################DDDI################################### @@ -798,17 +903,20 @@ class UDS_DDDI(Packet): 0x2: "defineByMemoryAddress", 0x3: "clearDynamicallyDefinedDataIdentifier"} fields_desc = [ + ConditionalField(XByteEnumField('service', 0x2c, UDS.services), _uds_slm), ByteEnumField('subFunction', 0, subFunctions), StrField('dataRecord', b"", fmt="B") ] -bind_layers(UDS, UDS_DDDI, service=0x2C) +bind_layers(UDS, UDS_DDDI, service=0x2c) +UDS._service_cls[0x2c] = UDS_DDDI class UDS_DDDIPR(Packet): name = 'DynamicallyDefineDataIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x6c, UDS.services), _uds_slm), ByteEnumField('subFunction', 0, UDS_DDDI.subFunctions), XShortField('dynamicallyDefinedDataIdentifier', 0) ] @@ -818,24 +926,28 @@ def answers(self, other): and other.subFunction == self.subFunction -bind_layers(UDS, UDS_DDDIPR, service=0x6C) +bind_layers(UDS, UDS_DDDIPR, service=0x6c) +UDS._service_cls[0x6c] = UDS_DDDIPR # #########################WDBI################################### class UDS_WDBI(Packet): name = 'WriteDataByIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x2e, UDS.services), _uds_slm), XShortEnumField('dataIdentifier', 0, UDS_RDBI.dataIdentifiers) ] -bind_layers(UDS, UDS_WDBI, service=0x2E) +bind_layers(UDS, UDS_WDBI, service=0x2e) +UDS._service_cls[0x2e] = UDS_WDBI class UDS_WDBIPR(Packet): name = 'WriteDataByIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x6e, UDS.services), _uds_slm), XShortEnumField('dataIdentifier', 0, UDS_RDBI.dataIdentifiers), ] @@ -845,13 +957,15 @@ def answers(self, other): and other.dataIdentifier == self.dataIdentifier -bind_layers(UDS, UDS_WDBIPR, service=0x6E) +bind_layers(UDS, UDS_WDBIPR, service=0x6e) +UDS._service_cls[0x6e] = UDS_WDBIPR # #########################WMBA################################### class UDS_WMBA(Packet): name = 'WriteMemoryByAddress' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x3d, UDS.services), _uds_slm), BitField('memorySizeLen', 0, 4), BitField('memoryAddressLen', 0, 4), ConditionalField(XByteField('memoryAddress1', 0), @@ -875,12 +989,14 @@ class UDS_WMBA(Packet): ] -bind_layers(UDS, UDS_WMBA, service=0x3D) +bind_layers(UDS, UDS_WMBA, service=0x3d) +UDS._service_cls[0x3d] = UDS_WMBA class UDS_WMBAPR(Packet): name = 'WriteMemoryByAddressPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7d, UDS.services), _uds_slm), BitField('memorySizeLen', 0, 4), BitField('memoryAddressLen', 0, 4), ConditionalField(XByteField('memoryAddress1', 0), @@ -907,13 +1023,14 @@ def answers(self, other): and other.memoryAddressLen == self.memoryAddressLen -bind_layers(UDS, UDS_WMBAPR, service=0x7D) +bind_layers(UDS, UDS_WMBAPR, service=0x7d) +UDS._service_cls[0x7d] = UDS_WMBAPR # ##########################DTC##################################### class DTC(Packet): name = 'Diagnostic Trouble Code' - dtc_descriptions = {} # Customize this dictionary for each individual ECU / OEM + dtc_descriptions = {} # type: Dict[int, str] fields_desc = [ BitEnumField("system", 0, 2, { @@ -938,6 +1055,7 @@ def extract_padding(self, s): class UDS_CDTCI(Packet): name = 'ClearDiagnosticInformation' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x14, UDS.services), _uds_slm), ByteField('groupOfDTCHighByte', 0), ByteField('groupOfDTCMiddleByte', 0), ByteField('groupOfDTCLowByte', 0), @@ -945,9 +1063,13 @@ class UDS_CDTCI(Packet): bind_layers(UDS, UDS_CDTCI, service=0x14) +UDS._service_cls[0x14] = UDS_CDTCI class UDS_CDTCIPR(Packet): + fields_desc = [ + ConditionalField(XByteEnumField('service', 0x54, UDS.services), _uds_slm), + ] name = 'ClearDiagnosticInformationPositiveResponse' def answers(self, other): @@ -955,6 +1077,7 @@ def answers(self, other): bind_layers(UDS, UDS_CDTCIPR, service=0x54) +UDS._service_cls[0x54] = UDS_CDTCIPR # #########################RDTCI################################### @@ -1012,6 +1135,7 @@ class UDS_RDTCI(Packet): } name = 'ReadDTCInformation' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x19, UDS.services), _uds_slm), ByteEnumField('reportType', 0, reportTypes), ConditionalField(FlagsField('DTCSeverityMask', 0, 8, dtcSeverityMask), lambda pkt: pkt.reportType in [0x07, 0x08]), @@ -1029,6 +1153,7 @@ class UDS_RDTCI(Packet): bind_layers(UDS, UDS_RDTCI, service=0x19) +UDS._service_cls[0x19] = UDS_RDTCI class DTCAndStatusRecord(Packet): @@ -1062,7 +1187,7 @@ class DTCExtendedDataRecord(Packet): class DTCSnapshot(Packet): - identifiers = defaultdict(list) # for later extension + identifiers = defaultdict(list) # type: Dict[int, list] # for later extension @staticmethod def next_identifier_cb(pkt, lst, cur, remain): @@ -1091,6 +1216,7 @@ class DTCSnapshotRecord(Packet): class UDS_RDTCIPR(Packet): name = 'ReadDTCInformationPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x59, UDS.services), _uds_slm), ByteEnumField('reportType', 0, UDS_RDTCI.reportTypes), ConditionalField( FlagsField('DTCStatusAvailabilityMask', 0, 8, UDS_RDTCI.dtcStatus), @@ -1140,6 +1266,7 @@ def answers(self, other): bind_layers(UDS, UDS_RDTCIPR, service=0x59) +UDS._service_cls[0x59] = UDS_RDTCIPR # #########################RC################################### @@ -1153,17 +1280,20 @@ class UDS_RC(Packet): routineControlIdentifiers = ObservableDict() name = 'RoutineControl' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x31, UDS.services), _uds_slm), ByteEnumField('routineControlType', 0, routineControlTypes), XShortEnumField('routineIdentifier', 0, routineControlIdentifiers) ] bind_layers(UDS, UDS_RC, service=0x31) +UDS._service_cls[0x31] = UDS_RC class UDS_RCPR(Packet): name = 'RoutineControlPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x71, UDS.services), _uds_slm), ByteEnumField('routineControlType', 0, UDS_RC.routineControlTypes), XShortEnumField('routineIdentifier', 0, UDS_RC.routineControlIdentifiers), @@ -1181,6 +1311,7 @@ def answers(self, other): bind_layers(UDS, UDS_RCPR, service=0x71) +UDS._service_cls[0x71] = UDS_RCPR # #########################RD################################### @@ -1190,6 +1321,7 @@ class UDS_RD(Packet): }) name = 'RequestDownload' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x34, UDS.services), _uds_slm), ByteEnumField('dataFormatIdentifier', 0, dataFormatIdentifiers), BitField('memorySizeLen', 0, 4), BitField('memoryAddressLen', 0, 4), @@ -1213,11 +1345,13 @@ class UDS_RD(Packet): bind_layers(UDS, UDS_RD, service=0x34) +UDS._service_cls[0x34] = UDS_RD class UDS_RDPR(Packet): name = 'RequestDownloadPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x74, UDS.services), _uds_slm), BitField('memorySizeLen', 0, 4), BitField('reserved', 0, 4), StrField('maxNumberOfBlockLength', b"", fmt="B"), @@ -1228,12 +1362,14 @@ def answers(self, other): bind_layers(UDS, UDS_RDPR, service=0x74) +UDS._service_cls[0x74] = UDS_RDPR # #########################RU################################### class UDS_RU(Packet): name = 'RequestUpload' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x35, UDS.services), _uds_slm), ByteEnumField('dataFormatIdentifier', 0, UDS_RD.dataFormatIdentifiers), BitField('memorySizeLen', 0, 4), @@ -1258,11 +1394,13 @@ class UDS_RU(Packet): bind_layers(UDS, UDS_RU, service=0x35) +UDS._service_cls[0x35] = UDS_RU class UDS_RUPR(Packet): name = 'RequestUploadPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x75, UDS.services), _uds_slm), BitField('memorySizeLen', 0, 4), BitField('reserved', 0, 4), StrField('maxNumberOfBlockLength', b"", fmt="B"), @@ -1273,23 +1411,27 @@ def answers(self, other): bind_layers(UDS, UDS_RUPR, service=0x75) +UDS._service_cls[0x75] = UDS_RUPR # #########################TD################################### class UDS_TD(Packet): name = 'TransferData' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x36, UDS.services), _uds_slm), ByteField('blockSequenceCounter', 0), StrField('transferRequestParameterRecord', b"", fmt="B") ] bind_layers(UDS, UDS_TD, service=0x36) +UDS._service_cls[0x36] = UDS_TD class UDS_TDPR(Packet): name = 'TransferDataPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x76, UDS.services), _uds_slm), ByteField('blockSequenceCounter', 0), StrField('transferResponseParameterRecord', b"", fmt="B") ] @@ -1300,22 +1442,26 @@ def answers(self, other): bind_layers(UDS, UDS_TDPR, service=0x76) +UDS._service_cls[0x76] = UDS_TDPR # #########################RTE################################### class UDS_RTE(Packet): name = 'RequestTransferExit' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x37, UDS.services), _uds_slm), StrField('transferRequestParameterRecord', b"", fmt="B") ] bind_layers(UDS, UDS_RTE, service=0x37) +UDS._service_cls[0x37] = UDS_RTE class UDS_RTEPR(Packet): name = 'RequestTransferExitPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x77, UDS.services), _uds_slm), StrField('transferResponseParameterRecord', b"", fmt="B") ] @@ -1324,6 +1470,7 @@ def answers(self, other): bind_layers(UDS, UDS_RTEPR, service=0x77) +UDS._service_cls[0x77] = UDS_RTEPR # #########################RFT################################### @@ -1344,6 +1491,7 @@ def _contains_file_size(packet): return packet.modeOfOperation not in [2, 4, 5] fields_desc = [ + ConditionalField(XByteEnumField('service', 0x38, UDS.services), _uds_slm), XByteEnumField('modeOfOperation', 0, modeOfOperations), FieldLenField('filePathAndNameLength', None, length_of='filePathAndName', fmt='H'), @@ -1369,6 +1517,7 @@ def _contains_file_size(packet): bind_layers(UDS, UDS_RFT, service=0x38) +UDS._service_cls[0x38] = UDS_RFT class UDS_RFTPR(Packet): @@ -1379,6 +1528,7 @@ def _contains_data_format_identifier(packet): return packet.modeOfOperation != 0x02 fields_desc = [ + ConditionalField(XByteEnumField('service', 0x78, UDS.services), _uds_slm), XByteEnumField('modeOfOperation', 0, UDS_RFT.modeOfOperations), ConditionalField(FieldLenField('lengthFormatIdentifier', None, length_of='maxNumberOfBlockLength', @@ -1411,22 +1561,26 @@ def answers(self, other): bind_layers(UDS, UDS_RFTPR, service=0x78) +UDS._service_cls[0x78] = UDS_RFTPR # #########################IOCBI################################### class UDS_IOCBI(Packet): name = 'InputOutputControlByIdentifier' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x2f, UDS.services), _uds_slm), XShortEnumField('dataIdentifier', 0, UDS_RDBI.dataIdentifiers), ] -bind_layers(UDS, UDS_IOCBI, service=0x2F) +bind_layers(UDS, UDS_IOCBI, service=0x2f) +UDS._service_cls[0x2f] = UDS_IOCBI class UDS_IOCBIPR(Packet): name = 'InputOutputControlByIdentifierPositiveResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x6f, UDS.services), _uds_slm), XShortEnumField('dataIdentifier', 0, UDS_RDBI.dataIdentifiers), ] @@ -1435,7 +1589,8 @@ def answers(self, other): and other.dataIdentifier == self.dataIdentifier -bind_layers(UDS, UDS_IOCBIPR, service=0x6F) +bind_layers(UDS, UDS_IOCBIPR, service=0x6f) +UDS._service_cls[0x6f] = UDS_IOCBIPR # #########################NR################################### @@ -1505,6 +1660,7 @@ class UDS_NR(Packet): } name = 'NegativeResponse' fields_desc = [ + ConditionalField(XByteEnumField('service', 0x7f, UDS.services), _uds_slm), XByteEnumField('requestServiceId', 0, UDS.services), ByteEnumField('negativeResponseCode', 0, negativeResponseCodes) ] @@ -1516,6 +1672,7 @@ def answers(self, other): bind_layers(UDS, UDS_NR, service=0x7f) +UDS._service_cls[0x7f] = UDS_NR # ################################################################## diff --git a/test/contrib/automotive/gm/gmlan.uts b/test/contrib/automotive/gm/gmlan.uts index 722b2bc974d..9b8d0a175aa 100644 --- a/test/contrib/automotive/gm/gmlan.uts +++ b/test/contrib/automotive/gm/gmlan.uts @@ -612,4 +612,128 @@ log = get_log(pkt) print(log) assert len(log) == 2 assert log[1] == "0x80" -assert log[0] == "DeviceControlPositiveResponse" \ No newline at end of file +assert log[0] == "DeviceControlPositiveResponse" ++ Single layer GMLAN mode + += Single layer mode: enable and basic dissect + +conf.contribs['GMLAN']['single_layer_mode'] = True + +ido = GMLAN(b'\x10\x02') +assert isinstance(ido, GMLAN_IDO), "Expected GMLAN_IDO, got %s" % type(ido) +assert ido.service == 0x10 +assert ido.subfunction == 0x02 + += Single layer mode: build GMLAN_IDO + +ido_built = GMLAN_IDO(subfunction=0x02) +assert bytes(ido_built) == b'\x10\x02', "Expected b'\\x10\\x02', got %s" % bytes(ido_built).hex() + += Single layer mode: dissect positive response (using SA which has a PR class) + +sapr = GMLAN(b'\x67\x01\xde\xad') +assert isinstance(sapr, GMLAN_SAPR), "Expected GMLAN_SAPR, got %s" % type(sapr) +assert sapr.service == 0x67 +assert sapr.subfunction == 0x01 + += Single layer mode: NegativeResponse dissect + +nr = GMLAN(b'\x7f\x10\x22') +assert isinstance(nr, GMLAN_NR), "Expected GMLAN_NR, got %s" % type(nr) +assert nr.service == 0x7f +assert nr.requestServiceId == 0x10 +assert nr.returnCode == 0x22 + += Single layer mode: NegativeResponse answers() + +ido2 = GMLAN_IDO(subfunction=0x02) +nr2 = GMLAN_NR(requestServiceId=0x10, returnCode=0x22) +assert nr2.answers(ido2) + += Single layer mode: hashret consistency between request and positive response (SA) + +sa3 = GMLAN_SA(subfunction=0x01) +sapr3 = GMLAN_SAPR(subfunction=0x01) +assert sa3.hashret() == sapr3.hashret(), \ + "hashret mismatch: %s vs %s" % (sa3.hashret().hex(), sapr3.hashret().hex()) + += Single layer mode: sub-subpacket bindings are unaffected + +rfrdpr = GMLAN(b'\x52\x01\x00\x01\x02\x03\x04') +assert isinstance(rfrdpr, GMLAN_RFRDPR), "Expected GMLAN_RFRDPR, got %s" % type(rfrdpr) + += Single layer mode: unknown service falls back to GMLAN + +unknown = GMLAN(b'\xBB\x01\x02') +assert isinstance(unknown, GMLAN), "Expected GMLAN fallback, got %s" % type(unknown) + += Single layer mode: switch back to multi-layer mode + +conf.contribs['GMLAN']['single_layer_mode'] = False + +ido4 = GMLAN(b'\x10\x02') +assert ido4.__class__ == GMLAN +assert ido4.service == 0x10 +assert ido4[GMLAN_IDO].subfunction == 0x02 + += Single layer mode: cleanup + +conf.contribs['GMLAN']['single_layer_mode'] = False +assert not conf.contribs['GMLAN']['single_layer_mode'] + ++ Compatibility mode GMLAN + += Compatibility mode: setup - enable both SLM and compat mode (default) + +conf.contribs['GMLAN']['single_layer_mode'] = True +conf.contribs['GMLAN']['compatibility_mode'] = True + += Compatibility mode ON + SLM ON: standalone sub-packet has service field + +ido_sa = GMLAN_IDO(subfunction=0x02) +assert bytes(ido_sa) == b'\x10\x02', \ + "Standalone GMLAN_IDO should include service byte, got %s" % bytes(ido_sa).hex() +assert ido_sa.service == 0x10 + += Compatibility mode ON + SLM ON: stacked sub-packet suppresses its service field + +stacked = GMLAN() / GMLAN_IDO(subfunction=0x02) +assert bytes(stacked) == b'\x10\x02', \ + "Stacked GMLAN/GMLAN_IDO should produce 2 bytes (no duplicate service), got %s" % bytes(stacked).hex() + += Compatibility mode ON + SLM ON: dissect standalone still works + +ido_dis = GMLAN(b'\x10\x02') +assert isinstance(ido_dis, GMLAN_IDO) +assert ido_dis.service == 0x10 +assert ido_dis.subfunction == 0x02 + += Compatibility mode OFF + SLM ON: stacked sub-packet always emits service field + +conf.contribs['GMLAN']['compatibility_mode'] = False + +stacked_nc = GMLAN() / GMLAN_IDO(subfunction=0x02) +assert bytes(stacked_nc) == b'\x10\x10\x02', \ + "With compat OFF, stacked GMLAN/GMLAN_IDO should produce 3 bytes, got %s" % bytes(stacked_nc).hex() + += Compatibility mode OFF + SLM ON: standalone sub-packet also has service field + +ido_nc = GMLAN_IDO(subfunction=0x02) +assert bytes(ido_nc) == b'\x10\x02', \ + "Standalone GMLAN_IDO should include service byte even with compat OFF, got %s" % bytes(ido_nc).hex() + += Compatibility mode: SLM OFF overrides compat mode - no service field in sub-packet + +conf.contribs['GMLAN']['single_layer_mode'] = False +conf.contribs['GMLAN']['compatibility_mode'] = False + +stacked_slm_off = GMLAN() / GMLAN_IDO(subfunction=0x02) +assert bytes(stacked_slm_off) == b'\x10\x02', \ + "With SLM OFF, no service field in GMLAN_IDO regardless of compat mode, got %s" % bytes(stacked_slm_off).hex() + += Compatibility mode: cleanup + +conf.contribs['GMLAN']['single_layer_mode'] = False +conf.contribs['GMLAN']['compatibility_mode'] = True +assert not conf.contribs['GMLAN']['single_layer_mode'] +assert conf.contribs['GMLAN']['compatibility_mode'] diff --git a/test/contrib/automotive/kwp.uts b/test/contrib/automotive/kwp.uts index d525b8554e6..73c99dff1a2 100644 --- a/test/contrib/automotive/kwp.uts +++ b/test/contrib/automotive/kwp.uts @@ -507,3 +507,144 @@ nrc = KWP(b'\x7f\x22\x33') assert nrc.service == 0x7f assert nrc.requestServiceId == 0x22 assert nrc.negativeResponseCode == 0x33 + ++ Single layer KWP mode + += Single layer mode: enable and basic dissect + +conf.contribs['KWP']['single_layer_mode'] = True + +sds = KWP(b'\x10\x01') +assert isinstance(sds, KWP_SDS), "Expected KWP_SDS, got %s" % type(sds) +assert sds.service == 0x10 +assert sds.diagnosticSession == 0x01 + += Single layer mode: build KWP_SDS + +sds_built = KWP_SDS(diagnosticSession=0x01) +assert bytes(sds_built) == b'\x10\x01', "Expected b'\\x10\\x01', got %s" % bytes(sds_built).hex() + += Single layer mode: dissect positive response + +sdspr = KWP(b'\x50\x01\xbe\xef') +assert isinstance(sdspr, KWP_SDSPR), "Expected KWP_SDSPR, got %s" % type(sdspr) +assert sdspr.service == 0x50 +assert sdspr.diagnosticSession == 0x01 + += Single layer mode: answers() between subpackets + +sds2 = KWP_SDS(diagnosticSession=0x01) +sdspr2 = KWP_SDSPR(diagnosticSession=0x01) +assert sdspr2.answers(sds2) + += Single layer mode: NegativeResponse dissect + +nr = KWP(b'\x7f\x10\x22') +assert isinstance(nr, KWP_NR), "Expected KWP_NR, got %s" % type(nr) +assert nr.service == 0x7f +assert nr.requestServiceId == 0x10 +assert nr.negativeResponseCode == 0x22 + += Single layer mode: NegativeResponse answers() + +sds3 = KWP_SDS(diagnosticSession=0x01) +nr2 = KWP_NR(requestServiceId=0x10, negativeResponseCode=0x22) +assert nr2.answers(sds3) + += Single layer mode: hashret consistency between request and positive response + +sds4 = KWP_SDS(diagnosticSession=0x01) +sdspr4 = KWP_SDSPR(diagnosticSession=0x01) +assert sds4.hashret() == sdspr4.hashret(), \ + "hashret mismatch: %s vs %s" % (sds4.hashret().hex(), sdspr4.hashret().hex()) + += Single layer mode: unknown service falls back to KWP + +unknown = KWP(b'\xAA\x01\x02') +assert isinstance(unknown, KWP), "Expected KWP fallback, got %s" % type(unknown) + += Single layer mode: switch back to multi-layer mode + +conf.contribs['KWP']['single_layer_mode'] = False + +sds5 = KWP(b'\x10\x01') +assert sds5.__class__ == KWP +assert sds5.service == 0x10 +assert sds5[KWP_SDS].diagnosticSession == 0x01 + += Single layer mode: idempotency + +conf.contribs['KWP']['single_layer_mode'] = True +conf.contribs['KWP']['single_layer_mode'] = True +sds6 = KWP(b'\x10\x01') +assert isinstance(sds6, KWP_SDS) + +conf.contribs['KWP']['single_layer_mode'] = False +conf.contribs['KWP']['single_layer_mode'] = False +sds7 = KWP(b'\x10\x01') +assert sds7.__class__ == KWP +count = sum(1 for fval, cls in KWP.payload_guess + if fval.get('service') == 0x10 and cls == KWP_SDS) +assert count == 1, "Expected 1 binding for KWP_SDS, got %d" % count + += Single layer mode: cleanup + +conf.contribs['KWP']['single_layer_mode'] = False +assert not conf.contribs['KWP']['single_layer_mode'] + ++ Compatibility mode KWP + += Compatibility mode: setup - enable both SLM and compat mode (default) + +conf.contribs['KWP']['single_layer_mode'] = True +conf.contribs['KWP']['compatibility_mode'] = True + += Compatibility mode ON + SLM ON: standalone sub-packet has service field + +sds_sa = KWP_SDS(diagnosticSession=0x01) +assert bytes(sds_sa) == b'\x10\x01', \ + "Standalone KWP_SDS should include service byte, got %s" % bytes(sds_sa).hex() +assert sds_sa.service == 0x10 + += Compatibility mode ON + SLM ON: stacked sub-packet suppresses its service field + +stacked = KWP() / KWP_SDS(diagnosticSession=0x01) +assert bytes(stacked) == b'\x10\x01', \ + "Stacked KWP/KWP_SDS should produce 2 bytes (no duplicate service), got %s" % bytes(stacked).hex() + += Compatibility mode ON + SLM ON: dissect standalone still works + +sds_dis = KWP(b'\x10\x01') +assert isinstance(sds_dis, KWP_SDS) +assert sds_dis.service == 0x10 +assert sds_dis.diagnosticSession == 0x01 + += Compatibility mode OFF + SLM ON: stacked sub-packet always emits service field + +conf.contribs['KWP']['compatibility_mode'] = False + +stacked_nc = KWP() / KWP_SDS(diagnosticSession=0x01) +assert bytes(stacked_nc) == b'\x10\x10\x01', \ + "With compat OFF, stacked KWP/KWP_SDS should produce 3 bytes, got %s" % bytes(stacked_nc).hex() + += Compatibility mode OFF + SLM ON: standalone sub-packet also has service field + +sds_nc = KWP_SDS(diagnosticSession=0x01) +assert bytes(sds_nc) == b'\x10\x01', \ + "Standalone KWP_SDS should include service byte even with compat OFF, got %s" % bytes(sds_nc).hex() + += Compatibility mode: SLM OFF overrides compat mode - no service field in sub-packet + +conf.contribs['KWP']['single_layer_mode'] = False +conf.contribs['KWP']['compatibility_mode'] = False + +stacked_slm_off = KWP() / KWP_SDS(diagnosticSession=0x01) +assert bytes(stacked_slm_off) == b'\x10\x01', \ + "With SLM OFF, no service field in KWP_SDS regardless of compat mode, got %s" % bytes(stacked_slm_off).hex() + += Compatibility mode: cleanup + +conf.contribs['KWP']['single_layer_mode'] = False +conf.contribs['KWP']['compatibility_mode'] = True +assert not conf.contribs['KWP']['single_layer_mode'] +assert conf.contribs['KWP']['compatibility_mode'] diff --git a/test/contrib/automotive/obd/obd.uts b/test/contrib/automotive/obd/obd.uts index fa65e95e447..2c5e8e71e11 100644 --- a/test/contrib/automotive/obd/obd.uts +++ b/test/contrib/automotive/obd/obd.uts @@ -1031,3 +1031,127 @@ assert b[22:] == b'ABCDEFGHIJKLMNOP' r = OBD(b'\x09\x02\x04') assert p.answers(r) + ++ Single layer OBD mode + += Single layer mode: enable and basic dissect + +conf.contribs['OBD']['single_layer_mode'] = True + +s01 = OBD(b'\x01\x0c') +assert isinstance(s01, OBD_S01), "Expected OBD_S01, got %s" % type(s01) +assert s01.service == 0x01 + += Single layer mode: build OBD_S01 + +s01_built = OBD_S01(pid=[0x0c]) +assert bytes(s01_built) == b'\x01\x0c', "Expected b'\\x01\\x0c', got %s" % bytes(s01_built).hex() + += Single layer mode: dissect positive response + +s01pr = OBD(b'\x41\x0c\x0f\xa0') +assert isinstance(s01pr, OBD_S01_PR), "Expected OBD_S01_PR, got %s" % type(s01pr) +assert s01pr.service == 0x41 + += Single layer mode: NegativeResponse dissect + +nr = OBD(b'\x7f\x01\x22') +assert isinstance(nr, OBD_NR), "Expected OBD_NR, got %s" % type(nr) +assert nr.service == 0x7f +assert nr.request_service_id == 0x01 +assert nr.response_code == 0x22 + += Single layer mode: NegativeResponse answers() + +s01_2 = OBD_S01(pid=[0x0c]) +nr2 = OBD_NR(request_service_id=0x01, response_code=0x22) +assert nr2.answers(s01_2) + += Single layer mode: hashret consistency between request and positive response + +s09 = OBD_S09(iid=[0x02]) +s09pr = OBD_S09_PR() +assert s09.hashret() == s09pr.hashret(), \ + "hashret mismatch: %s vs %s" % (s09.hashret().hex(), s09pr.hashret().hex()) + += Single layer mode: unknown service falls back to OBD + +unknown = OBD(b'\xBB\x01\x02') +assert isinstance(unknown, OBD), "Expected OBD fallback, got %s" % type(unknown) + += Single layer mode: switch back to multi-layer mode + +conf.contribs['OBD']['single_layer_mode'] = False + +s01_3 = OBD(b'\x01\x0c') +assert s01_3.__class__ == OBD +assert s01_3.service == 0x01 +assert isinstance(s01_3[OBD_S01], OBD_S01) + += Single layer mode: cleanup + +conf.contribs['OBD']['single_layer_mode'] = False +assert not conf.contribs['OBD']['single_layer_mode'] + ++ Compatibility mode OBD + += Compatibility mode: setup - enable both SLM and compat mode (default) + +conf.contribs['OBD']['single_layer_mode'] = True +conf.contribs['OBD']['compatibility_mode'] = True + += Compatibility mode ON + SLM ON: standalone sub-packet has service field + +s01_sa = OBD_S01(pid=[0x0c]) +assert bytes(s01_sa)[0:1] == b'\x01', \ + "Standalone OBD_S01 should include service byte 0x01, got %s" % bytes(s01_sa).hex() + += Compatibility mode ON + SLM ON: stacked sub-packet suppresses its service field + +stacked = OBD() / OBD_S01(pid=[0x0c]) +stacked_bytes = bytes(stacked) +assert stacked_bytes[0:1] == b'\x01', \ + "Stacked OBD/OBD_S01 first byte should be 0x01 (OBD service), got %s" % stacked_bytes.hex() +assert stacked_bytes[1:2] != b'\x01', \ + "No duplicate service byte expected, got %s" % stacked_bytes.hex() +assert len(stacked_bytes) == 2, \ + "Stacked OBD/OBD_S01(pid=[0x0c]) should be 2 bytes, got %s" % stacked_bytes.hex() + += Compatibility mode ON + SLM ON: dissect standalone still works + +s01_dis = OBD(b'\x01\x0c') +assert isinstance(s01_dis, OBD_S01) + += Compatibility mode OFF + SLM ON: stacked sub-packet always emits service field + +conf.contribs['OBD']['compatibility_mode'] = False + +stacked_nc = OBD() / OBD_S01(pid=[0x0c]) +stacked_nc_bytes = bytes(stacked_nc) +assert len(stacked_nc_bytes) == 3, \ + "With compat OFF, stacked OBD/OBD_S01 should produce 3 bytes (duplicate service), got %s" % stacked_nc_bytes.hex() +assert stacked_nc_bytes[0:1] == b'\x01' and stacked_nc_bytes[1:2] == b'\x01', \ + "With compat OFF, first two bytes should both be service 0x01, got %s" % stacked_nc_bytes.hex() + += Compatibility mode OFF + SLM ON: standalone sub-packet also has service field + +s01_nc = OBD_S01(pid=[0x0c]) +assert bytes(s01_nc)[0:1] == b'\x01', \ + "Standalone OBD_S01 should include service byte even with compat OFF" + += Compatibility mode: SLM OFF overrides compat mode - no service field in sub-packet + +conf.contribs['OBD']['single_layer_mode'] = False +conf.contribs['OBD']['compatibility_mode'] = False + +stacked_slm_off = OBD() / OBD_S01(pid=[0x0c]) +slm_off_bytes = bytes(stacked_slm_off) +assert len(slm_off_bytes) == 2, \ + "With SLM OFF, no service field in OBD_S01 regardless of compat mode, got %s" % slm_off_bytes.hex() + += Compatibility mode: cleanup + +conf.contribs['OBD']['single_layer_mode'] = False +conf.contribs['OBD']['compatibility_mode'] = True +assert not conf.contribs['OBD']['single_layer_mode'] +assert conf.contribs['OBD']['compatibility_mode'] diff --git a/test/contrib/automotive/uds.uts b/test/contrib/automotive/uds.uts index 1545076039d..e4a6bf07ba8 100644 --- a/test/contrib/automotive/uds.uts +++ b/test/contrib/automotive/uds.uts @@ -1438,3 +1438,179 @@ nrc = UDS(b'\x7f\x22\x33') assert nrc.service == 0x7f assert nrc.requestServiceId == 0x22 assert nrc.negativeResponseCode == 0x33 + ++ Single layer UDS mode + += Single layer mode: enable and basic dissect + +conf.contribs['UDS']['single_layer_mode'] = True + +dsc = UDS(b'\x10\x01') +assert isinstance(dsc, UDS_DSC), "Expected UDS_DSC, got %s" % type(dsc) +assert dsc.service == 0x10 +assert dsc.diagnosticSessionType == 0x01 + += Single layer mode: build UDS_DSC + +dsc_built = UDS_DSC(diagnosticSessionType=0x01) +assert bytes(dsc_built) == b'\x10\x01', "Expected b'\\x10\\x01', got %s" % bytes(dsc_built).hex() + += Single layer mode: UDS() / UDS_DSC() still works in single layer mode + +dsc_two = UDS_DSC(service=0x10, diagnosticSessionType=0x01) +assert dsc_two.service == 0x10 +assert dsc_two.diagnosticSessionType == 0x01 + += Single layer mode: dissect positive response + +dscpr = UDS(b'\x50\x01beef') +assert isinstance(dscpr, UDS_DSCPR), "Expected UDS_DSCPR, got %s" % type(dscpr) +assert dscpr.service == 0x50 +assert dscpr.diagnosticSessionType == 0x01 +assert dscpr.sessionParameterRecord == b"beef" + += Single layer mode: answers() between subpackets + +dsc = UDS_DSC(diagnosticSessionType=0x01) +dscpr = UDS_DSCPR(diagnosticSessionType=0x01, sessionParameterRecord=b"beef") +assert dscpr.answers(dsc) + += Single layer mode: answers() negative (different session type) + +dsc2 = UDS_DSC(diagnosticSessionType=0x02) +dscpr2 = UDS_DSCPR(diagnosticSessionType=0x01) +assert not dscpr2.answers(dsc2) + += Single layer mode: NegativeResponse dissect + +nr = UDS(b'\x7f\x10\x22') +assert isinstance(nr, UDS_NR), "Expected UDS_NR, got %s" % type(nr) +assert nr.service == 0x7f +assert nr.requestServiceId == 0x10 +assert nr.negativeResponseCode == 0x22 + += Single layer mode: NegativeResponse answers() + +dsc3 = UDS_DSC(diagnosticSessionType=0x01) +nr2 = UDS_NR(requestServiceId=0x10, negativeResponseCode=0x22) +assert nr2.answers(dsc3) + += Single layer mode: NegativeResponse does not answer wrong service + +er = UDS_ER(resetType=0x01) +assert not nr2.answers(er) + += Single layer mode: hashret consistency between request and positive response + +dsc4 = UDS_DSC(diagnosticSessionType=0x01) +dscpr4 = UDS_DSCPR(diagnosticSessionType=0x01) +assert dsc4.hashret() == dscpr4.hashret(), \ + "hashret mismatch: %s vs %s" % (dsc4.hashret().hex(), dscpr4.hashret().hex()) + += Single layer mode: UDS_RDBI dissect + +rdbi = UDS(b'\x22\x01\x02\x03\x04') +assert isinstance(rdbi, UDS_RDBI), "Expected UDS_RDBI, got %s" % type(rdbi) +assert rdbi.service == 0x22 + += Single layer mode: unknown service falls back to UDS + +unknown = UDS(b'\xAA\x01\x02') +assert isinstance(unknown, UDS), "Expected UDS fallback, got %s" % type(unknown) + += Single layer mode: switch back to multi-layer mode + +conf.contribs['UDS']['single_layer_mode'] = False + +dsc5 = UDS(b'\x10\x01') +assert dsc5.__class__ == UDS +assert dsc5.service == 0x10 +assert dsc5[UDS_DSC].diagnosticSessionType == 0x01 + +dscpr5 = UDS(b'\x50\x01beef') +assert dscpr5.__class__ == UDS +assert dscpr5.service == 0x50 +assert dscpr5[UDS_DSCPR].diagnosticSessionType == 0x01 + += Single layer mode: enable via conf directly + +conf.contribs['UDS']['single_layer_mode'] = True + +er6 = UDS(b'\x11\x01') +assert isinstance(er6, UDS_ER), "Expected UDS_ER, got %s" % type(er6) +assert er6.service == 0x11 +assert er6.resetType == 0x01 + += Single layer mode: final cleanup - restore default multi-layer mode + +conf.contribs['UDS']['single_layer_mode'] = False +assert not conf.contribs['UDS']['single_layer_mode'] + ++ Compatibility mode UDS + += Compatibility mode: setup - enable both SLM and compat mode (default) + +conf.contribs['UDS']['single_layer_mode'] = True +conf.contribs['UDS']['compatibility_mode'] = True + += Compatibility mode ON + SLM ON: standalone sub-packet has service field + +dsc_sa = UDS_DSC(diagnosticSessionType=0x01) +assert bytes(dsc_sa) == b'\x10\x01', \ + "Standalone UDS_DSC should include service byte, got %s" % bytes(dsc_sa).hex() +assert dsc_sa.service == 0x10 + += Compatibility mode ON + SLM ON: stacked sub-packet suppresses its service field + +stacked = UDS() / UDS_DSC(diagnosticSessionType=0x01) +assert bytes(stacked) == b'\x10\x01', \ + "Stacked UDS/UDS_DSC should produce 2 bytes (no duplicate service), got %s" % bytes(stacked).hex() + += Compatibility mode ON + SLM ON: positive response stacked suppresses service + +stacked_pr = UDS() / UDS_DSCPR(diagnosticSessionType=0x01, sessionParameterRecord=b"") +assert bytes(stacked_pr) == b'\x50\x01', \ + "Stacked UDS/UDS_DSCPR should produce 2 bytes, got %s" % bytes(stacked_pr).hex() + += Compatibility mode ON + SLM ON: dissect standalone still works + +dsc_dis = UDS(b'\x10\x01') +assert isinstance(dsc_dis, UDS_DSC) +assert dsc_dis.service == 0x10 +assert dsc_dis.diagnosticSessionType == 0x01 + += Compatibility mode OFF + SLM ON: stacked sub-packet always emits service field + +conf.contribs['UDS']['compatibility_mode'] = False + +stacked_nc = UDS() / UDS_DSC(diagnosticSessionType=0x01) +assert bytes(stacked_nc) == b'\x10\x10\x01', \ + "With compat OFF, stacked UDS/UDS_DSC should produce 3 bytes (duplicate service), got %s" % bytes(stacked_nc).hex() + += Compatibility mode OFF + SLM ON: standalone sub-packet also has service field + +dsc_nc = UDS_DSC(diagnosticSessionType=0x01) +assert bytes(dsc_nc) == b'\x10\x01', \ + "Standalone UDS_DSC should include service byte even with compat OFF, got %s" % bytes(dsc_nc).hex() + += Compatibility mode OFF + SLM ON: dissect standalone still works + +dsc_dis2 = UDS(b'\x10\x01') +assert isinstance(dsc_dis2, UDS_DSC) +assert dsc_dis2.service == 0x10 + += Compatibility mode: SLM OFF overrides compat mode - no service field in sub-packet + +conf.contribs['UDS']['single_layer_mode'] = False +conf.contribs['UDS']['compatibility_mode'] = False + +stacked_slm_off = UDS() / UDS_DSC(diagnosticSessionType=0x01) +assert bytes(stacked_slm_off) == b'\x10\x01', \ + "With SLM OFF, no service field in UDS_DSC regardless of compat mode, got %s" % bytes(stacked_slm_off).hex() + += Compatibility mode: cleanup + +conf.contribs['UDS']['single_layer_mode'] = False +conf.contribs['UDS']['compatibility_mode'] = True +assert not conf.contribs['UDS']['single_layer_mode'] +assert conf.contribs['UDS']['compatibility_mode']