-
Notifications
You must be signed in to change notification settings - Fork 49
Use PySequoia to parse signatures & not break on PQC signatures #2255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Don't blow up on encountering PQC signatures. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,6 @@ | ||
| import base64 | ||
| import hashlib | ||
| import fnmatch | ||
| import re | ||
| import subprocess | ||
| import gnupg | ||
| import json | ||
| import logging | ||
| import time | ||
|
|
@@ -15,6 +12,8 @@ | |
| from functools import partial | ||
| from rest_framework.exceptions import Throttled | ||
|
|
||
| from pysequoia.packet import PacketPile, Tag | ||
|
|
||
| from pulpcore.plugin.models import Artifact, Task | ||
| from pulpcore.plugin.util import get_domain | ||
|
|
||
|
|
@@ -32,9 +31,6 @@ | |
| SIGNATURE_SCHEMA, | ||
| ) | ||
|
|
||
| KEY_ID_REGEX_COMPILED = re.compile(r"keyid ([0-9A-F]+)") | ||
| TIMESTAMP_REGEX_COMPILED = re.compile(r"created ([0-9]+)") | ||
|
|
||
| signature_validator = Draft7Validator(SIGNATURE_SCHEMA) | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
@@ -86,6 +82,20 @@ def urlpath_sanitize(*args): | |
| return "/".join(segments) | ||
|
|
||
|
|
||
| def keyid_from_fingerprint(fingerprint): | ||
| """Derive a key ID from an OpenPGP fingerprint. | ||
|
|
||
| For v4 fingerprints (40 hex chars / 20 bytes), the key ID is the last 8 bytes. | ||
| For v6 fingerprints (64 hex chars / 32 bytes), the key ID is the first 8 bytes. | ||
| """ | ||
| if len(fingerprint) == 40: | ||
| return fingerprint[-16:] | ||
| elif len(fingerprint) == 64: | ||
| return fingerprint[:16] | ||
| else: | ||
| raise ValueError(f"Unexpected fingerprint length: {len(fingerprint)}") | ||
|
|
||
|
|
||
| def extract_data_from_signature(signature_raw, man_digest): | ||
| """ | ||
| Extract data from an "integrated" signature, aka a signed non-encrypted document. | ||
|
|
@@ -98,37 +108,56 @@ def extract_data_from_signature(signature_raw, man_digest): | |
| dict: JSON representation of the document and available data about signature | ||
|
|
||
| """ | ||
| gpg = gnupg.GPG() | ||
| crypt_obj = gpg.decrypt(signature_raw, extra_args=["--skip-verify"]) | ||
| if not crypt_obj.data: | ||
| log.info( | ||
| "It is not possible to read the signed document, GPG error: {}".format(crypt_obj.stderr) | ||
| try: | ||
| pile = PacketPile.from_bytes(signature_raw) | ||
| except Exception as exc: | ||
| raise ValueError( | ||
| "Signed document for manifest {} is un-parseable: {}".format(man_digest, str(exc)) | ||
| ) | ||
| return | ||
|
|
||
| literal_data = None | ||
| signing_key_id = None | ||
| signing_key_fingerprint = None | ||
| signature_timestamp = None | ||
|
|
||
| for packet in pile: | ||
| if packet.tag == Tag.Literal: | ||
| literal_data = bytes(packet.literal_data) | ||
| elif packet.tag == Tag.Signature: | ||
dralley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if packet.issuer_key_id is not None: | ||
| signing_key_id = packet.issuer_key_id.upper() | ||
|
Comment on lines
+127
to
+128
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now, not needed. Eventually, yes, because the model needs to add a fingerprint field. So I added a little bit extra just to make the next steps easier. |
||
| elif packet.issuer_fingerprint is not None: | ||
| signing_key_fingerprint = packet.issuer_fingerprint.upper() | ||
| signing_key_id = keyid_from_fingerprint(signing_key_fingerprint) | ||
| else: | ||
| raise ValueError( | ||
| "Signature for manifest {} has no fingerprint or key_id".format(man_digest) | ||
| ) | ||
| if packet.signature_created is not None: | ||
| signature_timestamp = int(packet.signature_created.timestamp()) | ||
|
|
||
| if not literal_data: | ||
| raise ValueError("Signature for manifest {} has no literal data".format(man_digest)) | ||
|
|
||
| try: | ||
| sig_json = json.loads(crypt_obj.data) | ||
| sig_json = json.loads(literal_data) | ||
| except Exception as exc: | ||
| log.info( | ||
| "Signed document cannot be parsed to create a signature for {}." | ||
| " Error: {}".format(man_digest, str(exc)) | ||
| raise ValueError( | ||
| "Signed document cannot be parsed to create a signature for {}. Error: {}".format( | ||
| man_digest, str(exc) | ||
| ) | ||
| ) | ||
| return | ||
|
|
||
| errors = [] | ||
| for error in signature_validator.iter_errors(sig_json): | ||
| errors.append(f'{".".join(error.path)}: {error.message}') | ||
|
|
||
| if errors: | ||
| log.info("The signature for {} is not synced due to: {}".format(man_digest, errors)) | ||
| return | ||
|
|
||
| # decrypted and unverified signatures do not have prepopulated the key_id and timestamp | ||
| # fields; thus, it is necessary to use the debugging utilities of gpg to extract these | ||
| # fields since they are not encrypted and still readable without decrypting the signature first | ||
| packets = subprocess.check_output(["gpg", "--list-packets"], input=signature_raw).decode() | ||
| sig_json["signing_key_id"] = KEY_ID_REGEX_COMPILED.search(packets).group(1) | ||
| sig_json["signature_timestamp"] = TIMESTAMP_REGEX_COMPILED.search(packets).group(1) | ||
| raise ValueError("The signature for {} is not synced due to: {}".format(man_digest, errors)) | ||
|
|
||
| sig_json["signing_key_id"] = signing_key_id | ||
| sig_json["signing_key_fingerprint"] = signing_key_fingerprint | ||
| sig_json["signature_timestamp"] = signature_timestamp | ||
|
|
||
| return sig_json | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ dependencies = [ | |
| "jsonschema>=4.4,<4.27", | ||
| "pulpcore>=3.73.2,<3.115", | ||
| "pyjwt[crypto]>=2.4,<2.13", | ||
| "pysequoia==0.1.32" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also the author promised to switch to semantic versioning after the next release |
||
| ] | ||
|
|
||
| [project.urls] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.