diff --git a/doc/scapy/layers/kerberos.rst b/doc/scapy/layers/kerberos.rst index 2ab19f5db83..a2cfe35ee42 100644 --- a/doc/scapy/layers/kerberos.rst +++ b/doc/scapy/layers/kerberos.rst @@ -238,6 +238,23 @@ As you can see, DMSA keys were imported in the keytab. You can use those as deta No tickets in CCache. +- **Create server keytab:** + +The following is equivalent to Windows' ``ktpass.exe /out kt.keytab /mapuser WKS02$@domain.local /princ host/WKS02.domain.local@domain.local /pass ScapyIsNice``. + +.. code:: pycon + + >>> t = Ticketer() + >>> t.add_cred("host/WKS02.domain.local@domain.local", etypes="all", mapupn="WKS02$@domain.local", password="ScapyIsNice") + Enter password: ************ + >>> t.show() + Keytab name: UNSAVED + Principal Timestamp KVNO Keytype + host/WKS02$.domain.local@domain.local 25/02/26 15:40:27 1 AES256-CTS-HMAC-SHA1-96 + + No tickets in CCache. + >>> t.save_keytab("kt.keytab") + - **Change password using kpasswd in 'set' mode:** .. code:: pycon @@ -370,6 +387,10 @@ Cheat sheet +---------------------------------------+--------------------------------+ | ``t.renew(i, [...])`` | Renew a TGT/ST | +---------------------------------------+--------------------------------+ +| ``t.remove_krb(i)`` | Remove a TGT/ST | ++---------------------------------------+--------------------------------+ +| ``t.set_primary(i)`` | Set the primary ticket | ++---------------------------------------+--------------------------------+ Other useful commands --------------------- diff --git a/scapy/compat.py b/scapy/compat.py index e3e2c2875a1..d8e5c050750 100644 --- a/scapy/compat.py +++ b/scapy/compat.py @@ -3,11 +3,12 @@ # See https://scapy.net/ for more information """ -Python 2 and 3 link classes. +Compatibility module to various older versions of Python """ import base64 import binascii +import enum import struct import sys @@ -39,6 +40,7 @@ 'orb', 'plain_str', 'raw', + 'StrEnum', ] # Typing compatibility @@ -46,7 +48,7 @@ # Note: # supporting typing on multiple python versions is a nightmare. # we provide a FakeType class to be able to use types added on -# later Python versions (since we run mypy on 3.12), on older +# later Python versions (since we run mypy on 3.14), on older # ones. @@ -100,6 +102,15 @@ class Protocol: else: Self = _FakeType("Self") + +# Python 3.11 Only +if sys.version_info >= (3, 11): + from enum import StrEnum +else: + class StrEnum(str, enum.Enum): + pass + + ########### # Python3 # ########### diff --git a/scapy/layers/kerberos.py b/scapy/layers/kerberos.py index a3125382ed5..a282e80998b 100644 --- a/scapy/layers/kerberos.py +++ b/scapy/layers/kerberos.py @@ -3033,7 +3033,9 @@ class KerberosClient(Automaton): :param dmsa: sets the 'unconditional delegation' mode for DMSA TGT retrieval """ - RES_AS_MODE = namedtuple("AS_Result", ["asrep", "sessionkey", "kdcrep", "upn"]) + RES_AS_MODE = namedtuple( + "AS_Result", ["asrep", "sessionkey", "kdcrep", "upn", "pa_type"] + ) RES_TGS_MODE = namedtuple("TGS_Result", ["tgsrep", "sessionkey", "kdcrep", "upn"]) class MODE(IntEnum): @@ -3232,6 +3234,7 @@ def __init__( self.fast_req_sent = False # Session parameters self.pre_auth = False + self.pa_type = None # preauth-type that's used self.fast_rep = None self.fast_error = None self.fast_skey = None # The random subkey used for fast @@ -3574,8 +3577,9 @@ def as_req(self): ) # Build PA-DATA + self.pa_type = 16 # PA-PK-AS-REQ pafactor = PADATA( - padataType=16, # PA-PK-AS-REQ + padataType=self.pa_type, padataValue=PA_PK_AS_REQ( signedAuthpack=signedAuthpack, trustedCertifiers=None, @@ -3596,15 +3600,17 @@ def as_req(self): b"clientchallengearmor", b"challengelongterm", ) + self.pa_type = 138 # PA-ENCRYPTED-CHALLENGE pafactor = PADATA( - padataType=138, # PA-ENCRYPTED-CHALLENGE + padataType=self.pa_type, padataValue=EncryptedData(), ) else: # Usual 'timestamp' factor ts_key = self.key + self.pa_type = 2 # PA-ENC-TIMESTAMP pafactor = PADATA( - padataType=2, # PA-ENC-TIMESTAMP + padataType=self.pa_type, padataValue=EncryptedData(), ) pafactor.padataValue.encrypt( @@ -4078,6 +4084,7 @@ def decrypt_as_rep(self, pkt): res.key.toKey(), res, pkt.root.getUPN(), + self.pa_type, ) @ATMT.receive_condition(SENT_TGS_REQ) diff --git a/scapy/layers/ldap.py b/scapy/layers/ldap.py index 38651dacd45..cdb540ec7da 100644 --- a/scapy/layers/ldap.py +++ b/scapy/layers/ldap.py @@ -26,8 +26,6 @@ import struct import uuid -from enum import Enum - from scapy.arch import get_if_addr from scapy.ansmachine import AnsweringMachine from scapy.asn1.asn1 import ( @@ -62,6 +60,7 @@ ) from scapy.asn1packet import ASN1_Packet from scapy.config import conf +from scapy.compat import StrEnum from scapy.error import log_runtime from scapy.fields import ( FieldLenField, @@ -90,6 +89,7 @@ GSS_C_FLAGS, GSS_C_NO_CHANNEL_BINDINGS, GSS_S_COMPLETE, + GSS_S_CONTINUE_NEEDED, GssChannelBindings, SSP, _GSSAPI_Field, @@ -106,6 +106,7 @@ Any, Dict, List, + Optional, Union, ) @@ -1642,8 +1643,8 @@ def dclocator( ##################### -class LDAP_BIND_MECHS(Enum): - NONE = "UNAUTHENTICATED" +class LDAP_BIND_MECHS(StrEnum): + NONE = "ANONYMOUS" SIMPLE = "SIMPLE" SASL_GSSAPI = "GSSAPI" SASL_GSS_SPNEGO = "GSS-SPNEGO" @@ -1949,8 +1950,8 @@ def bind( self, mech, ssp=None, - sign=False, - encrypt=False, + sign: Optional[bool] = None, + encrypt: Optional[bool] = None, simple_username=None, simple_password=None, ): @@ -1966,6 +1967,12 @@ def bind( : This acts differently based on the :mech: provided during initialization. """ + # Bind default values: if NTLM then encrypt, else sign unless anonymous/simple + if encrypt is None: + encrypt = mech == LDAP_BIND_MECHS.SICILY + if sign is None and not encrypt: + sign = mech not in [LDAP_BIND_MECHS.NONE, LDAP_BIND_MECHS.SIMPLE] + # Store and check consistency self.mech = mech self.ssp = ssp # type: SSP @@ -2000,6 +2007,9 @@ def bind( elif mech in [LDAP_BIND_MECHS.NONE, LDAP_BIND_MECHS.SIMPLE]: if self.sign or self.encrypt: raise ValueError("Cannot use 'sign' or 'encrypt' with NONE or SIMPLE !") + else: + raise ValueError("Mech %s is still unimplemented !" % mech) + if self.ssp is not None and mech in [ LDAP_BIND_MECHS.NONE, LDAP_BIND_MECHS.SIMPLE, @@ -2105,6 +2115,10 @@ def bind( ), chan_bindings=self.chan_bindings, ) + if status not in [GSS_S_COMPLETE, GSS_S_CONTINUE_NEEDED]: + raise RuntimeError( + "%s: GSS_Init_sec_context failed !" % self.mech.name, + ) while token: resp = self.sr1( LDAP_BindRequest( @@ -2116,10 +2130,10 @@ def bind( ) ) if not isinstance(resp.protocolOp, LDAP_BindResponse): - if self.verb: - print("%s bind failed !" % self.mech.name) - resp.show() - return + raise LDAP_Exception( + "%s bind failed !" % self.mech.name, + resp=resp, + ) val = resp.protocolOp.serverSaslCredsData if not val: status = resp.protocolOp.resultCode @@ -2195,11 +2209,20 @@ def bind( "GSSAPI SASL failed to negotiate client security flags !", resp=resp, ) + + # If we use SPNEGO and NTLMSSP was used, understand we can't use sign + if self.mech == LDAP_BIND_MECHS.SASL_GSS_SPNEGO: + from scapy.layers.ntlm import NTLMSSP + + if isinstance(self.sspcontext.ssp, NTLMSSP): + self.sign = False + # SASL wrapping is now available. self.sasl_wrap = self.encrypt or self.sign if self.sasl_wrap: self.sock.closed = True # prevent closing by marking it as already closed. self.sock = StreamSocket(self.sock.ins, LDAP_SASL_Buffer) + # Success. if self.verb: print("%s bind succeeded !" % self.mech.name) @@ -2460,3 +2483,4 @@ def close(self): print("X Connection closed\n") self.sock.close() self.bound = False + self.sspcontext = None diff --git a/scapy/layers/smb.py b/scapy/layers/smb.py index 541ab10c292..ded79f15a4d 100644 --- a/scapy/layers/smb.py +++ b/scapy/layers/smb.py @@ -286,7 +286,7 @@ class SMBNegotiate_Request(Packet): bind_layers(SMB_Header, SMBNegotiate_Request, Command=0x72) -# SMBNegociate Protocol Response +# SMBNegotiate Protocol Response def _SMBStrNullField(name, default): diff --git a/scapy/layers/smbclient.py b/scapy/layers/smbclient.py index 61f26e8f8c7..cef9f6e6195 100644 --- a/scapy/layers/smbclient.py +++ b/scapy/layers/smbclient.py @@ -1323,7 +1323,7 @@ def shares_output(self, results): """ print(pretty_list(results, [("ShareName", "ShareType", "Comment")])) - @CLIUtil.addcommand(spaces=True) + @CLIUtil.addcommand(mono=True) def use(self, share): """ Open a share @@ -1391,7 +1391,7 @@ def _dir_complete(self, arg): return [results[0] + "\\"] return results - @CLIUtil.addcommand(spaces=True) + @CLIUtil.addcommand(mono=True) def ls(self, parent=None): """ List the files in the remote directory @@ -1466,7 +1466,7 @@ def ls_complete(self, folder): return [] return self._dir_complete(folder) - @CLIUtil.addcommand(spaces=True) + @CLIUtil.addcommand(mono=True) def cd(self, folder): """ Change the remote current directory @@ -1534,7 +1534,7 @@ def lls_output(self, results): pretty_list(results, [("FileName", "File Size", "Last Modification Time")]) ) - @CLIUtil.addcommand(spaces=True) + @CLIUtil.addcommand(mono=True) def lcd(self, folder): """ Change the local current directory @@ -1663,7 +1663,7 @@ def _getr(self, directory, _root, _verb=True): print(conf.color_theme.red(remote), "->", str(ex)) return size - @CLIUtil.addcommand(spaces=True, globsupport=True) + @CLIUtil.addcommand(mono=True, globsupport=True) def get(self, file, _dest=None, _verb=True, *, r=False): """ Retrieve a file @@ -1703,7 +1703,7 @@ def get_complete(self, file): return [] return self._fs_complete(file) - @CLIUtil.addcommand(spaces=True, globsupport=True) + @CLIUtil.addcommand(mono=True, globsupport=True) def cat(self, file): """ Print a file @@ -1731,7 +1731,7 @@ def cat_complete(self, file): return [] return self._fs_complete(file) - @CLIUtil.addcommand(spaces=True, globsupport=True) + @CLIUtil.addcommand(mono=True, globsupport=True) def put(self, file): """ Upload a file @@ -1756,7 +1756,7 @@ def put_complete(self, folder): """ return self._lfs_complete(folder, lambda x: not x.is_dir()) - @CLIUtil.addcommand(spaces=True) + @CLIUtil.addcommand(mono=True) def rm(self, file): """ Delete a file @@ -1799,7 +1799,7 @@ def backup(self): print("Backup Intent: On") self.extra_create_options.append("FILE_OPEN_FOR_BACKUP_INTENT") - @CLIUtil.addcommand(spaces=True) + @CLIUtil.addcommand(mono=True) def watch(self, folder): """ Watch file changes in folder (recursively) @@ -1826,7 +1826,7 @@ def watch(self, folder): pass print("Cancelled.") - @CLIUtil.addcommand(spaces=True) + @CLIUtil.addcommand(mono=True) def getsd(self, file): """ Get the Security Descriptor diff --git a/scapy/layers/spnego.py b/scapy/layers/spnego.py index a37091313b3..9f498711c0c 100644 --- a/scapy/layers/spnego.py +++ b/scapy/layers/spnego.py @@ -610,10 +610,15 @@ def get_supported_mechtypes(self): for ssp in self.ssps: mechs.extend(ssp.GSS_Inquire_names_for_mech()) - # 2. Sort according to the preference order. - mechs.sort(key=lambda x: self._PREF_ORDER.index(x)) + # 2. Sort according to the selected SSP, then the preference order + selected_mech_oids = ( + self.ssp.GSS_Inquire_names_for_mech() if self.ssp else [] + ) + mechs.sort( + key=lambda x: (x not in selected_mech_oids, self._PREF_ORDER.index(x)) + ) - # 3. Return wrapped in MechType + # 4. Return wrapped in MechType return [SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in mechs] def negotiate_ssp(self) -> None: @@ -793,10 +798,12 @@ def from_cli_arguments( t.open_ccache(ccache) # Look for the ticket that we'll use. We chose: - # - either a ST if the SPN matches our target + # - either a ST if the UPN and SPN matches our target + # - or a ST that matches the UPN # - else a TGT if we got nothing better tgts = [] - for i, (tkt, key, upn, spn) in enumerate(t.iter_tickets()): + sts = [] + for i, (tkt, key, upn, spn) in t.enumerate_tickets(): spn, _ = _parse_spn(spn) spn_host = spn.split("/")[-1] # Check that it's for the correct user @@ -806,14 +813,20 @@ def from_cli_arguments( # TGT. Keep it, and see if we don't have a better ST. tgts.append(t.ssp(i)) elif hostname.lower() == spn_host.lower(): - # ST. We're done ! + # ST. UPN and SPN match. We're done ! ssps.append(t.ssp(i)) break + else: + # ST. UPN matches, Keep it + sts.append(t.ssp(i)) else: - # No ST found + # No perfect ticket found if tgts: # Using a TGT ! ssps.append(tgts[0]) + elif sts: + # Using a ST where at least the UPN matched ! + ssps.append(sts[0]) else: # Nothing found t.show() diff --git a/scapy/modules/ldaphero.py b/scapy/modules/ldaphero.py index 9ade79eb5d9..8e70095922d 100644 --- a/scapy/modules/ldaphero.py +++ b/scapy/modules/ldaphero.py @@ -123,37 +123,75 @@ def run(self) -> False: class LDAPHero: - """ + r""" LDAP Hero - LDAP GUI browser over Scapy's LDAP_Client - :param ssp: the SSP object to use when binding. + :param ssp: if provided, use this SSP for auth. :param mech: the LDAP_BIND_MECHS to use when binding. - :param simple_username: if provided, used for Simple binding (instead of the 'ssp') - :param simple_password: - :param encrypt: request encryption by default (useful when using 'ssp') + :param sign: request signature by default + :param encrypt: request encryption by default :param host: auto-connect to a specific host :param port: the port to connect to (default: 389/636) (This is only in use when using 'host') :param ssl: whether to use SSL to connect or not (This is only in use when using 'host') + + Authentication parameters: + + :param UPN: the upn to use (DOMAIN/USER, DOMAIN\USER, USER@DOMAIN or USER) + :param kerberos_required: require kerberos + :param password: if provided, used for auth + :param HashNt: if provided, used for auth (NTLM) + :param HashAes256Sha96: if provided, used for auth (Kerberos) + :param HashAes128Sha96: if provided, used for auth (Kerberos) """ def __init__( self, ssp: SSP = None, mech: LDAP_BIND_MECHS = None, - simple_username: str = None, - simple_password: str = None, + sign: bool = True, encrypt: bool = False, host: str = None, port: int = None, ssl: bool = False, + # Authentication + UPN: str = None, + password: str = None, + kerberos_required: bool = False, + HashNt: bytes = None, + HashAes256Sha96: bytes = None, + HashAes128Sha96: bytes = None, + use_krb5ccname: bool = False, ): self.client = LDAP_Client() + if ( + ssp is None + and mech in [None, LDAP_BIND_MECHS.SASL_GSS_SPNEGO] + and UPN + and host + ): + # We allow the SSP to be provided through arguments. + # In that case, use SPNEGO + mech = LDAP_BIND_MECHS.SASL_GSS_SPNEGO + ssp = SPNEGOSSP.from_cli_arguments( + UPN=UPN, + target=host, + password=password, + HashNt=HashNt, + HashAes256Sha96=HashAes256Sha96, + HashAes128Sha96=HashAes128Sha96, + kerberos_required=kerberos_required, + use_krb5ccname=use_krb5ccname, + ) self.ssp = ssp self.mech = mech - self.simple_username = simple_username - self.simple_password = simple_password + if mech == LDAP_BIND_MECHS.SIMPLE: + self.simple_username = UPN + self.simple_password = password + else: + self.simple_username = self.simple_password = None + self.sign = sign self.encrypt = encrypt # Session parameters self.connected = False @@ -317,6 +355,7 @@ def bind(self, *args): ssp=self.ssp, simple_username=self.simple_username, simple_password=self.simple_password, + sign=self.sign, encrypt=self.encrypt, ) except LDAP_Exception as ex: @@ -352,45 +391,66 @@ def bind(self, *args): domentry = tk.Entry(dlg, textvariable=domainv) domentry.grid(row=2, column=1) + # The "Bind Type" radio list bindtypefrm = ttk.LabelFrame( dlg, text="Bind type", ) bindtypev = tk.StringVar() - ttk.Radiobutton( + sicilybtn = ttk.Radiobutton( bindtypefrm, variable=bindtypev, text="Sicily bind (NTLM)", value=LDAP_BIND_MECHS.SICILY.value, - ).pack(anchor=tk.W) - ttk.Radiobutton( + ) + sicilybtn.pack(anchor=tk.W) + gssapibtn = ttk.Radiobutton( bindtypefrm, variable=bindtypev, text="GSSAPI bind (Kerberos)", value=LDAP_BIND_MECHS.SASL_GSSAPI.value, - ).pack(anchor=tk.W) - ttk.Radiobutton( + ) + gssapibtn.pack(anchor=tk.W) + spnegobtn = ttk.Radiobutton( bindtypefrm, variable=bindtypev, text="SPNEGO bind (NTLM/Kerberos)", value=LDAP_BIND_MECHS.SASL_GSS_SPNEGO.value, - ).pack(anchor=tk.W) - ttk.Radiobutton( + ) + spnegobtn.pack(anchor=tk.W) + simplebtn = ttk.Radiobutton( bindtypefrm, variable=bindtypev, text="Simple bind", value=LDAP_BIND_MECHS.SIMPLE.value, - ).pack(anchor=tk.W) + ) + simplebtn.pack(anchor=tk.W) bindtypefrm.grid(row=3, column=0, columnspan=2) + if "supportedSASLMechanisms" in self.rootDSE: + # Some algorithms might be unavailable + algs = self.rootDSE["supportedSASLMechanisms"] + if "GSSAPI" not in algs: + gssapibtn.config(state=tk.DISABLED) + if "GSS-SPNEGO" not in algs: + spnegobtn.config(state=tk.DISABLED) + + # Sign button + signv = tk.BooleanVar() + signv.set(self.sign) + ttk.Label(dlg, text="Sign traffic after bind").grid(row=4, column=0) + signbtn = ttk.Checkbutton(dlg, variable=signv) + signbtn.grid(row=4, column=1) + + # Encrypt button encryptv = tk.BooleanVar() encryptv.set(self.encrypt) - ttk.Label(dlg, text="Encrypt traffic after bind").grid(row=4, column=0) + ttk.Label(dlg, text="Encrypt traffic after bind").grid(row=5, column=0) encrbtn = ttk.Checkbutton(dlg, variable=encryptv) - encrbtn.grid(row=4, column=1) + encrbtn.grid(row=5, column=1) - ttk.Button(dlg, text="OK", command=popup.dismiss).grid(row=5, column=0) - ttk.Button(dlg, text="Cancel", command=popup.cancel).grid(row=5, column=1) + ttk.Button(dlg, text="OK", command=popup.dismiss).grid(row=6, column=0) + ttk.Button(dlg, text="Cancel", command=popup.cancel).grid(row=6, column=1) # Default state if self.dns_domain_name and not valid_ip(self.host): @@ -404,16 +464,20 @@ def bindtypechange(*args, **kwargs): bindtype = LDAP_BIND_MECHS(bindtypev.get()) if bindtype == LDAP_BIND_MECHS.SIMPLE: domentry.config(state=tk.DISABLED) + signbtn.config(state=tk.DISABLED) encrbtn.config(state=tk.DISABLED) encryptv.set(False) elif bindtype == LDAP_BIND_MECHS.SICILY: domentry.config(state=tk.DISABLED) + signbtn.config(state=tk.DISABLED) encrbtn.config(state=tk.NORMAL) else: domentry.config(state=tk.NORMAL, textvariable=domainv) + signbtn.config(state=tk.NORMAL) encrbtn.config(state=tk.NORMAL) - bindtypev.trace("w", bindtypechange) + bindtypev.trace_add("write", bindtypechange) + userf.focus() # Setup @@ -426,7 +490,8 @@ def bindtypechange(*args, **kwargs): password = passwordv.get() domain = domainv.get() bindtype = LDAP_BIND_MECHS(bindtypev.get()) - encrypt = encryptv.get() + self.sign = signv.get() + self.encrypt = encryptv.get() # Bind ! self.tprint("client.bind(%s, ...)" % bindtype) @@ -437,8 +502,9 @@ def bindtypechange(*args, **kwargs): self.ssp = None simple_username = username simple_password = password - encrypt = False + self.encrypt = False elif bindtype == LDAP_BIND_MECHS.SICILY: + self.sign = False self.ssp = NTLMSSP( UPN=username, PASSWORD=password, @@ -468,7 +534,8 @@ def bindtypechange(*args, **kwargs): ssp=self.ssp, simple_username=simple_username, simple_password=simple_password, - encrypt=encrypt, + sign=self.sign, + encrypt=self.encrypt, ) except LDAP_Exception as ex: self.tprint( @@ -536,7 +603,11 @@ def treedoubleclick(self, _): Action done on tree double-click. """ # Get clicked item - item = self.tk_tree.selection()[0] + try: + item = self.tk_tree.selection()[0] + except IndexError: + # Nothing is selected + return # Unclickable if self.tk_tree.tag_has("unclickable", item): diff --git a/scapy/modules/ticketer.py b/scapy/modules/ticketer.py index 8f8e9bfdd6c..93755d9198b 100644 --- a/scapy/modules/ticketer.py +++ b/scapy/modules/ticketer.py @@ -270,6 +270,9 @@ def set_from_krb(self, tkt, clientpart, sessionkey, kdcrep): # Set flags self.ticket_flags = int(kdcrep.flags.val, 2) + def is_xcacheconf(self): + return self.server.realm.data == b"X-CACHECONF:" + class CCache(Packet): fields_desc = [ @@ -330,7 +333,7 @@ class KeytabEntry(Packet): MayEnd(PacketField("key", KTKeyBlock(), KTKeyBlock)), ConditionalField( IntField("vno", None), - lambda pkt: "vno" in pkt.fields is not None or pkt.original, + lambda pkt: pkt.fields.get("vno", None) or pkt.original, ), ] @@ -536,20 +539,66 @@ def _to_str(x): print("No tickets in CCache.") return else: + if self.ccache.primary_principal.components: + print("Default principal: %s\n" % self.ccache.primary_principal.toPN()) + print("CCache tickets:") + # 1. Read configuration entries + configuration = collections.defaultdict(dict) + for cred in self.ccache.credentials: + if not cred.is_xcacheconf(): + # Skip non-configuration entries + continue + + if ( + len(cred.server.components) not in [2, 3] + or cred.server.components[0].data != b"krb5_ccache_conf_data" + ): + print("Skipping invalid X-CACHECONF !") + continue + + # Get all the values from this weird format + cname = cred.client.toPN() + key = cred.server.components[1].data.decode() + if len(cred.server.components) == 3: + sname = cred.server.components[2].data.decode() + else: + sname = None + value = cred.ticket.data.decode() + + # Store for this cname -> sname, the following 'key' setting. + configuration[(cname, sname)][key] = value + + # 2. Read credentials for i, cred in enumerate(self.ccache.credentials): - if cred.keyblock.keytype == 0: + if cred.is_xcacheconf(): + # Skip configuration entries continue + + # Get client and server principals + cname = cred.client.toPN() + sname = cred.server.toPN() + print( "%s. %s -> %s" % ( i, - cred.client.toPN(), - cred.server.toPN(), + cname, + sname, ) ) print(cred.sprintf(" %ticket_flags%")) + # If configuration entries match, show the settings here + print( + " " + + " ".join( + "%s=%s" % (key, value) + for _sname in [sname, None] + for key, value in configuration[(cname, _sname)].items() + if (cname, _sname) in configuration + ) + ) print( pretty_list( [ @@ -660,8 +709,30 @@ def remove_krb(self, i): :param i: the ticket to remove. """ + cred = self.ccache.credentials[i] + xcacheconfs = self.get_krb_xcacheopts(i) + + # Delete from the store del self.ccache.credentials[i] + # Among the remaining, do we have an option that's identical in name? + if any( + not xcred.is_xcacheconf() + and xcred.client.toPN() == cred.client.toPN() + and xcred.server.toPN() == cred.server.toPN() + for xcred in self.ccache.credentials + ): + # There is another ticket with the same client and server names. Stop here + return + + # There isno ticket exactly the same, remove all the xcacheconf that match + for xcred in xcacheconfs: + self.ccache.credentials.remove(xcred) + + # If this was the primary principal, remove from there + if cred.client.toPN() == self.ccache.primary_principal.toPN(): + self.ccache.primary_principal = CCPrincipal() + def import_krb(self, res, key=None, hash=None, _inplace=None): """ Import the result of krb_[tgs/as]_req or a Ticket into the CCache. @@ -676,6 +747,7 @@ def import_krb(self, res, key=None, hash=None, _inplace=None): cred = CCCredential() # Update the cred + xcacheconfs = {} if isinstance(res, KRB_Ticket): if key is None: key = self._prompt_hash( @@ -693,6 +765,11 @@ def import_krb(self, res, key=None, hash=None, _inplace=None): else: if isinstance(res, KerberosClient.RES_AS_MODE): rep = res.asrep + pa_type = res.pa_type + if pa_type is not None: + xcacheconfs["pa_type"] = str(pa_type) + if pa_type in [138]: + xcacheconfs["fast_avail"] = "yes" elif isinstance(res, KerberosClient.RES_TGS_MODE): rep = res.tgsrep @@ -712,6 +789,7 @@ def import_krb(self, res, key=None, hash=None, _inplace=None): ) else: raise ValueError("Unknown type of obj !") + cred.set_from_krb( rep.ticket, rep, @@ -721,7 +799,78 @@ def import_krb(self, res, key=None, hash=None, _inplace=None): # Append to ccache if _inplace is None: - self.ccache.credentials.append(cred) + _inplace = sum( + 1 for xcred in self.ccache.credentials if not xcred.is_xcacheconf() + ) + self.ccache.credentials.insert(_inplace, cred) + + # If this is the first credential, set it to primary + if len(self.ccache.credentials) == 1: + self.set_primary(_inplace) + + # For MIT kinit to be happy, we must provide extra options for the credential + for key, value in xcacheconfs.items(): + self.set_krb_xcacheconf(_inplace, key, value) + + def set_primary(self, i): + """ + Set the primary (=default) credential to the credential n°1 + """ + self.ccache.primary_principal = self.ccache.credentials[i].client + + def get_krb_xcacheopts(self, i: int): + """ + Get the X-CACHECONF config for a credential + """ + cred = self.ccache.credentials[i] + cname = cred.client.toPN() + sname = cred.server.toPN().encode() + return [ + xcred + for xcred in self.ccache.credentials + if ( + xcred.is_xcacheconf() + and xcred.client.toPN() == cname + and ( + len(xcred.server.components) == 2 + or xcred.server.components[2].data == sname + ) + ) + ] + + def set_krb_xcacheconf(self, i: int, key: str, value: str): + """ + Set a X-CACHECONF config for a credential + """ + key = key.encode() + value = value.encode() + cred = self.ccache.credentials[i] + sname = cred.server.toPN().encode() + + # First we look for a potential credential, if present + try: + conf_cred = next( + xcred + for xcred in self.get_krb_xcacheopts(i) + if xcred.server.components[1].data == key + ) + except StopIteration: + conf_cred = CCCredential( + client=cred.client, + server=CCPrincipal( + name_type=1, + realm=CCCountedOctetString(data=b"X-CACHECONF:"), + components=[ + CCCountedOctetString(data=b"krb5_ccache_conf_data"), + CCCountedOctetString(data=key), + CCCountedOctetString(data=sname), + ], + ), + ) + self.ccache.credentials.append(conf_cred) + + # Set value + conf_cred.ticket = CCCountedOctetString(data=value) def export_krb(self, i): """ @@ -764,17 +913,22 @@ def add_cred( # Detect if principal is a SPN or UPN and parse realm. realm = None - component = None + princname = None try: - component, realm = _parse_upn(principal) + _, realm = _parse_upn(principal) if salt is None and key is None: salt = krb_get_salt(principal) + princname = PrincipalName.fromSPN(principal) except ValueError: try: - component, realm = _parse_spn(principal) + _, realm = _parse_spn(principal) + princname = PrincipalName.fromSPN(principal) except ValueError: raise ValueError("Invalid principal ! (must be UPN or SPN)") + if not realm: + raise ValueError("Must provide the realm in the principal ! (with @DOMAIN)") + if salt is None and key is None: raise ValueError( "Salt could not be guessed. Please provide it, or provide 'mapupn' " @@ -821,17 +975,18 @@ def add_cred( ), components=[ KTCountedOctetString( - data=x, + data=x.val, ) - for x in component.split("/") + for x in princname.nameString ], timestamp=int(datetime.now().timestamp()), - vno8=kvno if kvno < 256 else None, + name_type=princname.nameType.val, + vno8=kvno, key=KTKeyBlock( keytype=key.etype, keyvalue=key.key, ), - vno=None if kvno < 256 else kvno, + vno=kvno, _parent=self.keytab, ) ) @@ -850,6 +1005,16 @@ def get_cred(self, principal, etype=None): "Note principals are case sensitive, as on ktpass.exe" ) + def remove_cred(self, principal, etype=None): + """ + Remove a credential from the Keytab by principal. + """ + for i, entry in enumerate(self.keytab.entries): + if entry.getPrincipal() == principal: + if etype is not None and etype != entry.key.keytype: + continue + del self.keytab.entries[i] + def ssp(self, i, **kwargs): """ Create a KerberosSSP from a ticket or from the keystore. @@ -2590,9 +2755,18 @@ def renew(self, i, ip=None, additional_tickets=[], **kwargs): self.import_krb(res, _inplace=i) + def enumerate_tickets(self): + """ + Enumerate through the tickets in the ccache + """ + for i, cred in enumerate(self.ccache.credentials): + if cred.is_xcacheconf(): + continue + yield i, self.export_krb(i) + def iter_tickets(self): """ Iterate through the tickets in the ccache """ - for i in range(len(self.ccache.credentials)): - yield self.export_krb(i) + for _, tkt in self.enumerate_tickets(): + yield tkt diff --git a/scapy/utils.py b/scapy/utils.py index 9b7ad90537d..2aea911462d 100644 --- a/scapy/utils.py +++ b/scapy/utils.py @@ -57,20 +57,20 @@ # Typing imports from typing import ( - cast, Any, AnyStr, Callable, + cast, Dict, IO, Iterator, List, Optional, - TYPE_CHECKING, + overload, Tuple, + TYPE_CHECKING, Type, Union, - overload, ) from scapy.compat import ( DecoratorCallable, @@ -3676,19 +3676,22 @@ def _parseallargs( @classmethod def addcommand( cls, - spaces: bool = False, + mono: bool = False, globsupport: bool = False, ) -> Callable[[DecoratorCallable], DecoratorCallable]: """ Decorator to register a command + + :param mono: if True, the command takes a single argument even + if there are spaces. """ def func(cmd: DecoratorCallable) -> DecoratorCallable: cmd.cliutil_type = _CLIUtilMetaclass.TYPE.COMMAND # type: ignore - cmd._spaces = spaces # type: ignore + cmd._mono = mono # type: ignore cmd._globsupport = globsupport # type: ignore cls._inspectkwargs(cmd) - if cmd._globsupport and not cmd._spaces: # type: ignore - raise ValueError("Cannot use globsupport without spaces.") + if cmd._globsupport and not cmd._mono: # type: ignore + raise ValueError("Cannot use globsupport without mono.") return cmd return func @@ -3705,13 +3708,17 @@ def func(processor: DecoratorCallable) -> DecoratorCallable: return func @classmethod - def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 + def addcomplete( + cls, + cmd: DecoratorCallable, + ) -> Callable[[DecoratorCallable], DecoratorCallable]: """ Decorator to register a command completor """ def func(processor: DecoratorCallable) -> DecoratorCallable: processor.cliutil_type = _CLIUtilMetaclass.TYPE.COMPLETE # type: ignore processor.cliutil_ref = cmd # type: ignore + processor._mono = cmd._mono # type: ignore return processor return func @@ -3782,6 +3789,40 @@ def _args(func: Any) -> str: ) ) + def _split_cmd(self, cmd: str) -> Tuple[List[str], List[int]]: + """ + Split the command in multiple arguments + """ + quoted = None + queue = [""] + offsets = [0] + for i, c in enumerate(cmd): + if c == "'" or c == '"': + # This is a quote. + if quoted is not None and quoted == c: + # We are closing the last quote + quoted = None + elif quoted: + queue[-1] += c + else: + quoted = c + elif c == " ": + # This is a space. + if quoted is not None: + # We're in a quote, append it + queue[-1] += c + elif queue[-1]: + # Not in a quote, this splits the argument. + queue += [""] + offsets.append(i) + else: + # Padding space, advance offset + offsets[-1] += 1 + else: + # This is a char + queue[-1] += c + return queue, offsets + def _completer(self) -> 'prompt_toolkit.completion.Completer': """ Returns a prompt_toolkit custom completer @@ -3793,7 +3834,7 @@ def get_completions(cmpl, document, complete_event): # type: ignore if not complete_event.completion_requested: # Only activate when the user does return - parts = document.text.split(" ") + parts, offsets = self._split_cmd(document.text) cmd = parts[0].lower() if cmd not in self.commands: # We are trying to complete the command @@ -3804,10 +3845,30 @@ def get_completions(cmpl, document, complete_event): # type: ignore if len(parts) == 1: return args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:]) - arg = " ".join(args) if cmd in self.commands_complete: - for possible_arg in self.commands_complete[cmd](self, arg): - yield Completion(possible_arg, start_position=-len(arg)) + completer = self.commands_complete[cmd] + # If the completion is 'mono', it's a single argument with + # spaces. Else we pass the list of arguments to complete, + # and we only complete the last argument. + if completer._mono: # type: ignore + arg = " ".join(args) + completions = completer(self, arg) + startpos = offsets[1] + else: + completions = completer(self, args) + startpos = offsets[-1] + + # For each possible completion + for possible_arg in completions: + # If there's a space in the completion, and we're + # not in mono mode, add quotes. + if " " in possible_arg and not completer._mono: # type: ignore # noqa: E501 + possible_arg = '"%s"' % possible_arg + + yield Completion( + possible_arg, + start_position=startpos - len(document.text) + 1 + ) return return CLICompleter() @@ -3826,8 +3887,9 @@ def loop(self, debug: int = 0) -> None: except EOFError: self.close() break - args = cmd.split(" ")[1:] - cmd = cmd.split(" ")[0].strip().lower() + parts, _ = self._split_cmd(cmd) + args = parts[1:] + cmd = parts[0].strip().lower() if not cmd: continue if cmd in ["help", "h", "?"]: @@ -3841,7 +3903,7 @@ def loop(self, debug: int = 0) -> None: # check the number of arguments func = self.commands[cmd] args, kwargs, outkwargs = self._parseallargs(func, cmd, args) - if func._spaces: # type: ignore + if func._mono: # type: ignore args = [" ".join(args)] # if globsupport is set, we might need to do several calls if func._globsupport and "*" in args[0]: # type: ignore @@ -3944,6 +4006,12 @@ def AutoArgparse( hexarguments.append(parname) elif param.annotation in [str, int, float]: paramkwargs["type"] = param.annotation + elif ( + isinstance(param.annotation, type) and + issubclass(param.annotation, enum.Enum) + ): + paramkwargs["type"] = param.annotation + paramkwargs["choices"] = list(param.annotation) else: continue if param.default != inspect.Parameter.empty: diff --git a/test/scapy/layers/kerberos.uts b/test/scapy/layers/kerberos.uts index 9f6607bb076..e7805b35c99 100644 --- a/test/scapy/layers/kerberos.uts +++ b/test/scapy/layers/kerberos.uts @@ -1037,9 +1037,12 @@ with ContextManagerCaptureOutput() as cmco: print(outp) assert outp == """ +Default principal: Administrator@DOMAIN.LOCAL + CCache tickets: 0. Administrator@DOMAIN.LOCAL -> krbtgt/DOMAIN.LOCAL@DOMAIN.LOCAL canonicalize+pre-authent+initial+renewable+proxiable+forwardable + pa_type=2 Start time End time Renew until Auth time 27/09/22 17:29:30 28/09/22 03:29:30 30/09/22 17:29:27 27/09/22 17:29:30 """.strip() @@ -1062,9 +1065,10 @@ assert RESULT == EXPECTED_CCACHE_DATA = Ticketer++ - Import ticket TKT = KRB_Ticket(bytes.fromhex("618204b3308204afa003020105a10e1b0c444f4d41494e2e4c4f43414ca221301fa003020102a11830161b066b72627467741b0c444f4d41494e2e4c4f43414ca38204733082046fa003020112a103020103a28204610482045dbd10c11e1def682dc3607c98db0806acf2809a1f8c73fda44f86c14bd039c4c95a41ed400ac4e558970c51316ffdf34bd695a636bcb1e5074419d083e918085ec56ff77af9f6a410faff3b9859a635184486c83521b5390ec724185057e3e62843a92d9ba500dd24d9ebeff0654fe459cf35d9607b11f7c35bf6ba4dd378fd5c99554650296abcc374c3ff2fcf807038848f351e9134f69726b5e92aec99e4aa99613c35609b0094b533811513e9ba48b9113f0f2b4dbcf9e05a6668c998c09f65ae48c8ea1b7fbc62b5cbbec7decc0a4832df93aec08c138a63621f8c584a8530a380b54b37fdb8dda6924e4260710cf8b66c71479dcb6916790c5c582b9953cab7085178e280d182a74f93fcd3bc83a0dc26284551a4d230a50a8b341de132fdf0f97bb7abdec48021e04c3deda89897c684d5603636bd66842ed4b2586f8e09fbb5e0228bcce3e5ffc82e5674f16a65a4f1b7b17b3854a5465734a5fec573c54526f27b9ea8a64646f01268b040d09f2acda82a37fb195cb24f8c1092919574999fd61d859aed2af5a9457a20a72e6188c0d813cb12713779f84f7bed298e2cd793b06e639d859b4fb3a5f746e2023bcf0627a8a87425899aa3a9b63f558965eccabc35330562b055426e2fc6808c456ee8f047d09a7021b6a4f2547cde6552224b294750efd492ea0745035f76a394d5b6e26442e5542b4d557722ee21b70c05567241ed97dffb31502d950c50462f478fccd8454ec38424688e87c4428c3763b369f1b51509ef36548dcf7a5c842475aa65bec10d6f86cecd90e4694f36d68052b55a2715c00e269c215071311482118ed0168fabb3053ad59dcdf42a42502685cdfcc679d2272dd12ab658ff8588b34cb48b3aef4a1961694ab2b31a812a683015ed343a8c21498997b0ded3767f73e069c9633845b582d6f1a987d6b09d31b330a3cbf2c430fb6f5d6fa27f83d9624b7bb8cebc248933b68dbe1b6b2822b96621159d9249ded893cbedcf1fc5ee77cb69695852170b24ea2f36aa898a24212b2edf84459a4381bd243797b9a3281d7e1b280f6add79dbb1cc5d887178d0813549a168a38be441bb387764098c4e7bed81f7973ee19e733767a4dd05212a18b12c838c674c18b0d6304a28be3de7928ffdd1449d297884c6a6a574b13a0d289425c1ebf37c5af56d04753fcc0c02fdcc98427fb9aa33510905ba2b6746a8b59742e4243f6fba814585b122794a54aecba3ea956a0c85fded2582cb4809ee7be471253f0256503636e81f35df38b177c3c071677e1dd9efa6b10c6a122ab0522f2b10e8b625355f5c1e7996c7055237182691ede31a5e602966f90c2a66bdf997872dbdc97155d723bc1fb187bd0f42cbcdedbe2c5717d13e27e2134ac6cd9d3a53cd215344a8278065da4eea7544860eda5fdb41f849ff7c1db775f7a0a62d2875b43b55bc091e8056666507dfcaded40a83211db7a5856d4c9b5e2ef862830cef8a4c36ce034e9a9e11f558f008cdbe4152081c30dae53b6de44e1703236490cfc87be9e96fa0679f87255069994a262d61d57be0382fe9e570")) +TKT_SKEY = bytes.fromhex("dd4e16dbcfe19d82cb6fc9b593bb7449c1d8a46687dc20c295ed0e51cc4c3d0d") t = Ticketer() -t.import_krb(TKT, hash=bytes.fromhex("dd4e16dbcfe19d82cb6fc9b593bb7449c1d8a46687dc20c295ed0e51cc4c3d0d")) +t.import_krb(TKT, hash=TKT_SKEY) tkt, _, upn, spn = t.export_krb(0) hexdiff(tkt, TKT) @@ -1075,13 +1079,13 @@ assert spn == 'krbtgt/DOMAIN.LOCAL@DOMAIN.LOCAL' = Ticketer++ - Create keytab t = Ticketer() -t.add_cred("host/dc.domain.local", password="Scapy1", salt=b"salt") +t.add_cred("host/dc.domain.local@domain.local", password="Scapy1", salt=b"salt") -assert t.get_cred("host/dc.domain.local").key == bytes.fromhex("811f44006ad73972ffec42cc89ce6e79749e6effd8db4db5fb0f38c0f3fa6f4f") +assert t.get_cred("host/dc.domain.local@domain.local").key == bytes.fromhex("811f44006ad73972ffec42cc89ce6e79749e6effd8db4db5fb0f38c0f3fa6f4f") = Ticketer++ - Get SPN ssp -ssp = t.ssp("host/dc.domain.local") +ssp = t.ssp("host/dc.domain.local@domain.local") = Ticketer++ - Load keytab @@ -1130,6 +1134,48 @@ TICKETER_TEMPFILE = get_temp_file() t.save_keytab(TICKETER_TEMPFILE) + += Ticketer++ - Real example - Import ASREP + +t = Ticketer() + +asrep = KerberosClient.RES_AS_MODE( + asrep=KRB_AS_REP(bytes.fromhex('6b82083c30820838a003020105a10302010ba2820174308201703082016ca10402020088a28201620482015ea082015a30820156a08201523082014ea003020112a2820145048201414722f301958ad09a272342ec03ec5f04b76de456b73ab2684d49a2ba9ddd900199e0cee8dff6bcc573d30def6aeec4c39385b7ecea55b4d096a8fe5fecac0cf8122f710bb4b69953ecc35954a4e5f7ac84d73f17b290aac1e6cad32a58fb6db7d0ff1d816e40c34375f18d69af15a243b6652e4630b4b80f4c94f5b8ae7aecb199ca3d25c69600df88b6e7624feb3345e872543d537b403073e8dcb80310e8ca45fed3d7f53db440b4b7d55299721dfe620a2e55dbf5abc8c9219854df02700af0b1e7117a62d402b10ec336df6de09fb594ebf96a5957849dbdb7add039c6f5e9ed10cfd93b621b33b5f3c27c7d84f731f3a8e10b1ed39bcd04cdfba41452e85b0a5650b0011486f3137057ad7c09d56f3509a7efd6bc66c49e9a30b3a63c26b24b0575e06cd1be22c99df6baf3413d5da09e7d41cee2c9dc4b0014623dd7014ba30e1b0c444f4d41494e2e4c4f43414ca41a3018a003020101a111300f1b0d41646d696e6973747261746f72a58205106182050c30820508a003020105a10e1b0c444f4d41494e2e4c4f43414ca221301fa003020102a11830161b066b72627467741b0c444f4d41494e2e4c4f43414ca38204cc308204c8a003020112a103020102a28204ba048204b6156ec9c868912ff7af63ced40bc8492a7a505250a0a93f2ba924f4634cf488dc6212994fdb4a95bc94169f3872c50a628f3ff25975c8e575b3c179364a62e3a6b38b514960eb9f04ecded7c173cf9aeb4ebe3b39e9c80c8acdd41ed26caae83881666a4304b42a37de4bb6bd40859fc9cffd5fb5821809da0319aa0b2a35b98a0a77df53a9b3d47ecaacc8acdf404ac7b16247dd94e15122b709d4a44da741f32fe05e6a77b74e6b63993b85aa004cd38714ed48399f75a5f0c140a8555b6cc043b3226cd58f74eb70921220a0e906b5a8292e589d38685e6782abc4d159822787e9f43194e11e27a514c62dbcd71f8349dd6d556814ea77687c46e2238bc061671d68ef1d3e3091a736ddad27eec84fc2b41b115c18b02f92828311950564661f3023c73d75a6b20aaa223bdeace648e815ab8ffaf943cadd4113f456960f8a903dceeea0f59826299c752287c1ecd188a4c83adbc3054ba4f4b91537710bc8c242d8820dd62a46c48a81619abb959f9eb8db5ba64ad5121ace5cee443dbcc7a25b8dd0fede9d77b1051a8c96a39c852402bd020052df0780eea63f4b6b8b2c7f63749602fcc92d123d5e2c1a85339d942bff57939967b99e159901b1efb05da164736329dc8fdce78a3bb1e0f449627870380bc4d28de38ab11bca95d6317fa64f33a7be46be25cd560425b0b1b1b7d0b6873fe44d2d41db26f2c2ddd6dc5b0f707f12fa60d1e65516c120dd2adc2ac9f7ad35986f34f050ccc08611a0641f62adafdd68e206730a8b484352bb99334796a0212b683a6fca76bf7fc57e264ecb5ff8d7ed76163f7554c112d6666b885eecc6c10257d7e481544c81df7f4f5878aab1936ecff57830335c1b37a20f32a63303be325e3fa54c1f7561663925230fe40f386f63457e98f5a5fe92b6241262fa9783c60d1195b3124ae3642c7c3c9f9bd6f4b1d5220e4fff22c6c2eb4f5cf4d08b5359e77b608f6a2962050a039280e98cecf5e5bb613b7e18ca557103b30ab4a31fcdda26a1fec01653569295351faa0c8dfafcc77bf745507031d10c3ddefdde29aee9f932e1126527f2b47209a83aa20bcfab1ca9a176f52eaaf08948deacf9981979c422a82fdc23473763ca6571b50702bb13d67c7fdddc4dbe320e47cc75f7c12fa3f7eb39bfde9995c2984ed8f4de09786d4745281a58f4750f9752ba3040c16ff8e4a0bcaf4216f4261df823e0cf7d1d8319cc546de50fbc53e78bed55feadcd86cd088d9c74b76c7db6768a1c3cad159d8f928a0c6f7d084f3ef2e1e77dd3ebbc32149ac582fb443deeb865e781eee59bb8241239369b229d53d6dce6b40a4722504097cfc73f93ed3bb7d6a52e2f47a7bd3e330268122fc02d21d400f1f1292c32cdd0f0d748ff5941a98eb048e7e645141b09c55fb266c086b1765ec90032443c5f99fc8fff9d2e9561f1b70f30369d4e818a879f6eacb357c44d1411008b7706b4adf02a5b5471069f2b2f5fe3c292a7e2d00d8570b1755b6349582eeafc45be4b352d10bb8e2914016489fa6d427d8bd45abd67bc88612ce6a45a46a5aede05743b79196f84fc37455968f8b1095ec48897f671f375d1d1e5296aa7831d4148a270ef6496bfa98dccfbddeef5f9166d83bbd1caf5e7d93f981e70e3df067437f6fe1925a26c9b078e76aebc05b50ac613ebba3012d57d711d8dc60e7ceba68201723082016ea003020112a103020101a28201600482015c3c1e391355e9dbd061b723b597afd5a10aff251e65b58295729849c5160e11987ae9aec03c1aa5d9c26906479cabab031740eea870f7bb5423f1936f0ed2aed1fa6dfcd9d6ba373220c65635b2d52c00fe5f73c8ed5a3857e135d6eb10a8ffd47653abb60b8b93d7947e4b5d2cff7fa1374958b928affba651a5f6b34dcab3c3297919c181fc2807e8cb353fb8a6ebeacd2020cdd327ab3e53045f242c493f88aecf82ff05051789baa551a93d2aaf9fc1596a33ea1f8d54b557f74747918c2f970040e2ed9cbdf52c172e0a87e5e795ce6c80705cc2bfaa156b5998e481b2e57e7ff0d1501ff5aa3a7cbad586ca0250181d7fd2d0f7b755b5bf202d1bff510dacccea27bc6f608a91919e3a4dfa5d2f621b0721f8e44d3741336603654eab1e2dcabbe8517545542944646b5867153d7ba7212de28ddb6be1e1166acaee7715df82273ff6ea62f33337a7f4250a3d0812fa5b57c7164a2ec651e15a')), + sessionkey=Key(EncryptionType.AES256_CTS_HMAC_SHA1_96, bytes.fromhex('1051fc76a7a5bf42d2a35dae808c74cbd7c5d7c3bca8dba775db8426058ecb62')), + kdcrep=EncASRepPart(bytes.fromhex('7982013c30820138a02b3029a003020112a12204201051fc76a7a5bf42d2a35dae808c74cbd7c5d7c3bca8dba775db8426058ecb62a11c301a3018a003020100a111180f32303236303232353232333731315aa20602043328586ba311180f32313030303931343032343830355aa40703050040e10000a511180f32303236303232353232333731315aa611180f32303236303232353232333731315aa711180f32303236303232363038333731315aa811180f32303236303232363038333731325aa90e1b0c444f4d41494e2e4c4f43414caa21301fa003020102a11830161b066b72627467741b0c444f4d41494e2e4c4f43414cab1d301b3019a003020114a112041057494e31302020202020202020202020ac2930273015a104020200a7a20d040b3009a00703050080000000300ea104020200a5a206040400000500')), + upn='Administrator@DOMAIN.LOCAL', + pa_type=138, +) +t.import_krb(asrep) + += Ticketer++ - Real example - Check X-CACHECONF + +xcacheconfs = t.get_krb_xcacheopts(0) + +assert len(xcacheconfs) == 2 +assert all(x.client.toPN() == 'Administrator@DOMAIN.LOCAL' for x in xcacheconfs) +assert all(x.server.components[0].data == b"krb5_ccache_conf_data" for x in xcacheconfs) +assert all(x.server.components[2].data == b'krbtgt/DOMAIN.LOCAL@DOMAIN.LOCAL' for x in xcacheconfs) +assert xcacheconfs[0].server.components[1].data == b"pa_type" +assert xcacheconfs[1].server.components[1].data == b"fast_avail" +assert xcacheconfs[0].ticket.data == b'138' +assert xcacheconfs[1].ticket.data == b'yes' + += Ticketer++ - Real example - Check primary_principal + +assert t.ccache.primary_principal.toPN() == 'Administrator@DOMAIN.LOCAL' +assert t.ccache.primary_principal.name_type == 1 + += Ticketer++ - Real example - Check iter_tickets + +assert len(list(t.iter_tickets())) == 1 +assert len(t.ccache.credentials) == 3 + += Ticketer++ - Real example - Test remove_krb + +t.remove_krb(0) +assert len(t.ccache.credentials) == 0 + + Crypto tests = RFC3691 - Test vectors for KRB-FX-CF2 diff --git a/test/scapy/layers/smb.uts b/test/scapy/layers/smb.uts index 1b90f1c348d..7142658b17d 100644 --- a/test/scapy/layers/smb.uts +++ b/test/scapy/layers/smb.uts @@ -32,11 +32,11 @@ assert TCP in pkt assert NBTSession in pkt assert pkt[NBTSession].LENGTH == 47 assert _SMBGeneric in pkt -# Should not have a proper SMBNegociate header as magic is \xf0SMB, not \xffSMB +# Should not have a proper SMBNegotiate header as magic is \xf0SMB, not \xffSMB assert SMB_Header not in pkt -= test SMB Negociate Header - assemble += test SMB Negotiate Header - assemble pkt = IP() / TCP() / NBTSession() / SMB_Header() / SMBNegotiate_Request() pkt = IP(raw(pkt))