Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 82 additions & 19 deletions scapy/layers/quic.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
)

# RFC9000 table 3
"""
RFC 9000 Sect 19.19 Connection close values:
0x1c is used to signal errors at only the QUIC layer, or the absence of errors.
0x1d is used to signal an error with the application that uses QUIC.
"""
_quic_payloads = {
0x00: "PADDING",
0x01: "PING",
Expand All @@ -66,6 +71,7 @@
0x1A: "PATH_CHALLENGE",
0x1B: "PATH_RESPONSE",
0x1C: "CONNECTION_CLOSE",
0x1D: "CONNECTION_CLOSE",
0x1E: "HANDSHAKE_DONE",
}

Expand Down Expand Up @@ -124,23 +130,75 @@ def i2repr(
return _EnumField.i2repr(self, pkt, x)


# -- Headers --
class QuicLongPacketTypeField(BitField):
"""
2-bit field whose meaning depends on pkt.Version (v1 vs v2).
- For parsing: store raw bits.
- For display: show the correct name based on Version.
- For building: if None, default to Initial bits (v1->0, v2->1).
"""
def i2m(self, pkt, x):
if x is None and pkt is not None:
ver = getattr(pkt, "Version", QUIC_V1)

# default per packet *type* + version
if isinstance(pkt, QUIC_Initial):
return 0x01 if ver == QUIC_V2 else 0x00
if isinstance(pkt, QUIC_0RTT):
return 0x02 if ver == QUIC_V2 else 0x01
if isinstance(pkt, QUIC_Handshake):
return 0x03 if ver == QUIC_V2 else 0x02
if isinstance(pkt, QUIC_Retry):
return 0x00 if ver == QUIC_V2 else 0x03

# safe fallback
return 0x00

return 0x00 if x is None else x

def i2repr(self, pkt, x):
ver = getattr(pkt, "Version", None)
table = _quic_long_pkttyp_v2 if ver == QUIC_V2 else _quic_long_pkttyp_v1
return table.get(x, f"Unknown({x})")

def any2i(self, pkt, x):
if isinstance(x, str) and pkt is not None:
ver = getattr(pkt, "Version", None)
table = _quic_long_pkttyp_v2 if ver == QUIC_V2 else _quic_long_pkttyp_v1
inv = {v: k for k, v in table.items()}
return inv[x]
return x


# -- Headers --
QUIC_V1 = 0x00000001
QUIC_V2 = 0x6b3343cf

# RFC9000 sect 17.2
_quic_long_hdr = {
0: "Short",
1: "Long",
}

_quic_long_pkttyp = {

_quic_long_pkttyp_v1 = {
# RFC9000 table 5
0x00: "Initial",
0x01: "0-RTT",
0x02: "Handshake",
0x03: "Retry",
}

_quic_long_pkttyp_v2 = {
# RFC9369 3.2
0x01: "Initial",
0x02: "0-RTT",
0x03: "Handshake",
0x00: "Retry",
}



# RFC9000 sect 17 abstraction


Expand All @@ -152,6 +210,7 @@ def dispatch_hook(cls, _pkt=None, *args, **kargs):
"""
Returns the right class for the given data.
"""
class_by_name_type = {"Initial":QUIC_Initial, "0-RTT":QUIC_0RTT, "Handshake": QUIC_Handshake, "Retry": QUIC_Retry} # Class type will be consistent for both QUIC versions.
if _pkt:
hdr = _pkt[0]
if hdr & 0x80:
Expand All @@ -160,12 +219,9 @@ def dispatch_hook(cls, _pkt=None, *args, **kargs):
return QUIC_Version
else:
typ = (hdr & 0x30) >> 4
return {
0: QUIC_Initial,
1: QUIC_0RTT,
2: QUIC_Handshake,
3: QUIC_Retry,
}[typ]
ver = struct.unpack("!I", _pkt[1:5])[0] if len(_pkt) >= 5 else QUIC_V1 # Extracting version constant. RFC8999 5.1 Figure 2 && RFC9000 17.2
typ_in_name = _quic_long_pkttyp_v2.get(typ, "Initial") if ver == QUIC_V2 else _quic_long_pkttyp_v1.get(typ, "Initial") # Default version 1, and match as well for version 2.
return class_by_name_type.get(typ_in_name, QUIC_Initial)
else:
# Short Header packets
return QUIC_1RTT
Expand Down Expand Up @@ -250,12 +306,12 @@ def i2m(self, pkt, x):

class QUIC_Initial(QUIC):
name = "QUIC - Initial"
Version = 0x00000001
Version = QUIC_V1
fields_desc = (
[
BitEnumField("HeaderForm", 1, 1, _quic_long_hdr),
BitField("FixedBit", 1, 1),
BitEnumField("LongPacketType", 0, 2, _quic_long_pkttyp),
QuicLongPacketTypeField("LongPacketType", None, 2),
BitField("Reserved", 0, 2),
QuicPacketNumberBitFieldLenField("PacketNumberLen", None, 2),
]
Expand All @@ -272,7 +328,7 @@ class QUIC_Initial(QUIC):
# RFC9000 sect 17.2.3
class QUIC_0RTT(QUIC):
name = "QUIC - 0-RTT"
LongPacketType = 1
Version = QUIC_V1
fields_desc = QUIC_Initial.fields_desc[:10] + [
QuicVarIntField("Length", 0),
QuicPacketNumberField("PacketNumber", 0),
Expand All @@ -282,15 +338,14 @@ class QUIC_0RTT(QUIC):
# RFC9000 sect 17.2.4
class QUIC_Handshake(QUIC):
name = "QUIC - Handshake"
LongPacketType = 2
Version = QUIC_V1
fields_desc = QUIC_0RTT.fields_desc


# RFC9000 sect 17.2.5
class QUIC_Retry(QUIC):
name = "QUIC - Retry"
LongPacketType = 3
Version = 0x00000001
Version = QUIC_V1
fields_desc = (
QUIC_Initial.fields_desc[:3]
+ [
Expand Down Expand Up @@ -335,8 +390,16 @@ class QUIC_ACK(Packet):
ByteEnumField("Type", 0x02, _quic_payloads),
]


# Bindings
# bind_bottom_up(UDP, QUIC, dport=443)
# bind_bottom_up(UDP, QUIC, sport=443)
# bind_layers(UDP, QUIC, dport=443, sport=443)
# RFC9000 sect 19.19
class QUIC_CONNECTION_CLOSE(Packet):
name = "QUIC - CONNECTION_CLOSE"
fields_desc = [
QuicVarEnumField("type", 0x1C, {
0x1C: "CONNECTION_CLOSE", # transport variant
0x1D: "CONNECTION_CLOSE", # application variant
}),
QuicVarIntField("error_code", 0),
ConditionalField(QuicVarIntField("frame_type", 0), lambda pkt: pkt.type == 0x1C), # Exists only to 0x1C variants.
QuicVarLenField("reason_length", None, length_of="reason_phrase"),
StrLenField("reason_phrase", b"", length_from=lambda pkt: pkt.reason_length),
]
Loading