diff --git a/scapy/layers/quic.py b/scapy/layers/quic.py index 03d2b25ec94..d5060a85380 100644 --- a/scapy/layers/quic.py +++ b/scapy/layers/quic.py @@ -46,6 +46,11 @@ ) # RFC9000 table 3 +""" + RFC 9000 Sect 19.19 Connection close values: + 0x1c is used to signal errors at only the QUIC layer, or the absence of errors. + 0x1d is used to signal an error with the application that uses QUIC. +""" _quic_payloads = { 0x00: "PADDING", 0x01: "PING", @@ -66,6 +71,7 @@ 0x1A: "PATH_CHALLENGE", 0x1B: "PATH_RESPONSE", 0x1C: "CONNECTION_CLOSE", + 0x1D: "CONNECTION_CLOSE", 0x1E: "HANDSHAKE_DONE", } @@ -124,8 +130,49 @@ def i2repr( return _EnumField.i2repr(self, pkt, x) -# -- Headers -- +class QuicLongPacketTypeField(BitField): + """ + 2-bit field whose meaning depends on pkt.Version (v1 vs v2). + - For parsing: store raw bits. + - For display: show the correct name based on Version. + - For building: if None, default to Initial bits (v1->0, v2->1). + """ + def i2m(self, pkt, x): + if x is None and pkt is not None: + ver = getattr(pkt, "Version", QUIC_V1) + + # default per packet *type* + version + if isinstance(pkt, QUIC_Initial): + return 0x01 if ver == QUIC_V2 else 0x00 + if isinstance(pkt, QUIC_0RTT): + return 0x02 if ver == QUIC_V2 else 0x01 + if isinstance(pkt, QUIC_Handshake): + return 0x03 if ver == QUIC_V2 else 0x02 + if isinstance(pkt, QUIC_Retry): + return 0x00 if ver == QUIC_V2 else 0x03 + + # safe fallback + return 0x00 + + return 0x00 if x is None else x + + def i2repr(self, pkt, x): + ver = getattr(pkt, "Version", None) + table = _quic_long_pkttyp_v2 if ver == QUIC_V2 else _quic_long_pkttyp_v1 + return table.get(x, f"Unknown({x})") + + def any2i(self, pkt, x): + if isinstance(x, str) and pkt is not None: + ver = getattr(pkt, "Version", None) + table = _quic_long_pkttyp_v2 if ver == QUIC_V2 else _quic_long_pkttyp_v1 + inv = {v: k for k, v in table.items()} + return inv[x] + return x + +# -- Headers -- +QUIC_V1 = 0x00000001 +QUIC_V2 = 0x6b3343cf # RFC9000 sect 17.2 _quic_long_hdr = { @@ -133,7 +180,8 @@ def i2repr( 1: "Long", } -_quic_long_pkttyp = { + +_quic_long_pkttyp_v1 = { # RFC9000 table 5 0x00: "Initial", 0x01: "0-RTT", @@ -141,6 +189,16 @@ def i2repr( 0x03: "Retry", } +_quic_long_pkttyp_v2 = { + # RFC9369 3.2 + 0x01: "Initial", + 0x02: "0-RTT", + 0x03: "Handshake", + 0x00: "Retry", +} + + + # RFC9000 sect 17 abstraction @@ -152,6 +210,7 @@ def dispatch_hook(cls, _pkt=None, *args, **kargs): """ Returns the right class for the given data. """ + class_by_name_type = {"Initial":QUIC_Initial, "0-RTT":QUIC_0RTT, "Handshake": QUIC_Handshake, "Retry": QUIC_Retry} # Class type will be consistent for both QUIC versions. if _pkt: hdr = _pkt[0] if hdr & 0x80: @@ -160,12 +219,9 @@ def dispatch_hook(cls, _pkt=None, *args, **kargs): return QUIC_Version else: typ = (hdr & 0x30) >> 4 - return { - 0: QUIC_Initial, - 1: QUIC_0RTT, - 2: QUIC_Handshake, - 3: QUIC_Retry, - }[typ] + ver = struct.unpack("!I", _pkt[1:5])[0] if len(_pkt) >= 5 else QUIC_V1 # Extracting version constant. RFC8999 5.1 Figure 2 && RFC9000 17.2 + typ_in_name = _quic_long_pkttyp_v2.get(typ, "Initial") if ver == QUIC_V2 else _quic_long_pkttyp_v1.get(typ, "Initial") # Default version 1, and match as well for version 2. + return class_by_name_type.get(typ_in_name, QUIC_Initial) else: # Short Header packets return QUIC_1RTT @@ -250,12 +306,12 @@ def i2m(self, pkt, x): class QUIC_Initial(QUIC): name = "QUIC - Initial" - Version = 0x00000001 + Version = QUIC_V1 fields_desc = ( [ BitEnumField("HeaderForm", 1, 1, _quic_long_hdr), BitField("FixedBit", 1, 1), - BitEnumField("LongPacketType", 0, 2, _quic_long_pkttyp), + QuicLongPacketTypeField("LongPacketType", None, 2), BitField("Reserved", 0, 2), QuicPacketNumberBitFieldLenField("PacketNumberLen", None, 2), ] @@ -272,7 +328,7 @@ class QUIC_Initial(QUIC): # RFC9000 sect 17.2.3 class QUIC_0RTT(QUIC): name = "QUIC - 0-RTT" - LongPacketType = 1 + Version = QUIC_V1 fields_desc = QUIC_Initial.fields_desc[:10] + [ QuicVarIntField("Length", 0), QuicPacketNumberField("PacketNumber", 0), @@ -282,15 +338,14 @@ class QUIC_0RTT(QUIC): # RFC9000 sect 17.2.4 class QUIC_Handshake(QUIC): name = "QUIC - Handshake" - LongPacketType = 2 + Version = QUIC_V1 fields_desc = QUIC_0RTT.fields_desc # RFC9000 sect 17.2.5 class QUIC_Retry(QUIC): name = "QUIC - Retry" - LongPacketType = 3 - Version = 0x00000001 + Version = QUIC_V1 fields_desc = ( QUIC_Initial.fields_desc[:3] + [ @@ -335,8 +390,16 @@ class QUIC_ACK(Packet): ByteEnumField("Type", 0x02, _quic_payloads), ] - -# Bindings -# bind_bottom_up(UDP, QUIC, dport=443) -# bind_bottom_up(UDP, QUIC, sport=443) -# bind_layers(UDP, QUIC, dport=443, sport=443) +# RFC9000 sect 19.19 +class QUIC_CONNECTION_CLOSE(Packet): + name = "QUIC - CONNECTION_CLOSE" + fields_desc = [ + QuicVarEnumField("type", 0x1C, { + 0x1C: "CONNECTION_CLOSE", # transport variant + 0x1D: "CONNECTION_CLOSE", # application variant + }), + QuicVarIntField("error_code", 0), + ConditionalField(QuicVarIntField("frame_type", 0), lambda pkt: pkt.type == 0x1C), # Exists only to 0x1C variants. + QuicVarLenField("reason_length", None, length_of="reason_phrase"), + StrLenField("reason_phrase", b"", length_from=lambda pkt: pkt.reason_length), + ]