1313from typing import Optional
1414
1515import dateparser
16+ from cvss .exceptions import CVSS3MalformedError
1617from packageurl import PackageURL
1718from univers .version_range import RANGE_CLASS_BY_SCHEMES
1819from univers .versions import InvalidVersion
19- from univers .versions import PypiVersion
2020from univers .versions import SemverVersion
2121from univers .versions import Version
2222
3131
3232logger = logging .getLogger (__name__ )
3333
34+ PURL_TYPE_BY_OSV_SCHEME = {
35+ "npm" : "npm" ,
36+ "pypi" : "pypi" ,
37+ "maven" : "maven" ,
38+ "nuget" : "nuget" ,
39+ "packagist" : "composer" ,
40+ "rubygems" : "gem" ,
41+ "go" : "golang" ,
42+ "hex" : "hex" ,
43+ "cargo" : "cargo" ,
44+ }
3445
35- def parse_advisory_data (raw_data : dict , supported_ecosystem ) -> Optional [AdvisoryData ]:
46+
47+ def parse_advisory_data (raw_data : dict , supported_ecosystems : List ) -> Optional [AdvisoryData ]:
3648 """
3749 Return an AdvisoryData build from a ``raw_data`` mapping of OSV advisory and
3850 a ``supported_ecosystem`` string.
@@ -54,18 +66,24 @@ def parse_advisory_data(raw_data: dict, supported_ecosystem) -> Optional[Advisor
5466
5567 for affected_pkg in raw_data .get ("affected" ) or []:
5668 purl = get_affected_purl (affected_pkg = affected_pkg , raw_id = raw_id )
57- if purl .type != supported_ecosystem :
69+ if purl .type in PURL_TYPE_BY_OSV_SCHEME :
70+ new_type = PURL_TYPE_BY_OSV_SCHEME [purl .type ]
71+ purl = purl ._replace (type = new_type )
72+
73+ if purl .type not in supported_ecosystems :
5874 logger .error (f"Unsupported package type: { purl !r} in OSV: { raw_id !r} " )
5975 continue
6076
6177 affected_version_range = get_affected_version_range (
6278 affected_pkg = affected_pkg ,
6379 raw_id = raw_id ,
64- supported_ecosystem = supported_ecosystem ,
80+ supported_ecosystem = purl . type ,
6581 )
6682
6783 for fixed_range in affected_pkg .get ("ranges" ) or []:
68- fixed_version = get_fixed_versions (fixed_range = fixed_range , raw_id = raw_id )
84+ fixed_version = get_fixed_versions (
85+ fixed_range = fixed_range , raw_id = raw_id , supported_ecosystem = purl .type
86+ )
6987
7088 for version in fixed_version :
7189 affected_packages .append (
@@ -118,14 +136,21 @@ def get_severities(raw_data) -> Iterable[VulnerabilitySeverity]:
118136 """
119137 Yield VulnerabilitySeverity extracted from a mapping of OSV ``raw_data``
120138 """
121- for severity in raw_data .get ("severity" ) or []:
122- if severity .get ("type" ) == "CVSS_V3" :
123- vector = severity ["score" ]
124- system = SCORING_SYSTEMS ["cvssv3.1" ]
125- score = system .compute (vector )
126- yield VulnerabilitySeverity (system = system , value = score , scoring_elements = vector )
127- else :
128- logger .error (f"Unsupported severity type: { severity !r} for OSV id: { raw_data ['id' ]!r} " )
139+ try :
140+ for severity in raw_data .get ("severity" ) or []:
141+ if severity .get ("type" ) == "CVSS_V3" :
142+ vector = severity ["score" ]
143+ valid_vector = vector [::- 1 ] if vector [- 1 ] == "/" else vector
144+ system = SCORING_SYSTEMS ["cvssv3.1" ]
145+ score = system .compute (valid_vector )
146+ yield VulnerabilitySeverity (system = system , value = score , scoring_elements = vector )
147+
148+ else :
149+ logger .error (
150+ f"Unsupported severity type: { severity !r} for OSV id: { raw_data ['id' ]!r} "
151+ )
152+ except CVSS3MalformedError as e :
153+ logger .error (f"Invalid severity { e } " )
129154
130155 ecosystem_specific = raw_data .get ("ecosystem_specific" ) or {}
131156 severity = ecosystem_specific .get ("severity" )
@@ -204,18 +229,17 @@ def get_affected_version_range(affected_pkg, raw_id, supported_ecosystem):
204229 )
205230
206231
207- def get_fixed_versions (fixed_range , raw_id ) -> List [Version ]:
232+ def get_fixed_versions (fixed_range , raw_id , supported_ecosystem ) -> List [Version ]:
208233 """
209234 Return a list of unique fixed univers Versions given a ``fixed_range``
210235 univers VersionRange and a ``raw_id``.
211-
212236 For example::
213-
214- >>> get_fixed_versions(fixed_range={}, raw_id="GHSA-j3f7-7rmc-6wqj")
237+ >>> get_fixed_versions(fixed_range={}, raw_id="GHSA-j3f7-7rmc-6wqj", supported_ecosystem="pypi",)
215238 []
216239 >>> get_fixed_versions(
217- ... fixed_range={"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}]},
218- ... raw_id="GHSA-j3f7-7rmc-6wqj"
240+ ... fixed_range={"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}], },
241+ ... raw_id="GHSA-j3f7-7rmc-6wqj",
242+ ... supported_ecosystem="pypi",
219243 ... )
220244 [PypiVersion(string='1.7.0')]
221245 """
@@ -226,21 +250,27 @@ def get_fixed_versions(fixed_range, raw_id) -> List[Version]:
226250
227251 fixed_range_type = fixed_range ["type" ]
228252
229- for version in extract_fixed_versions (fixed_range ):
253+ version_range_class = RANGE_CLASS_BY_SCHEMES .get (supported_ecosystem )
254+ version_class = version_range_class .version_class if version_range_class else None
230255
231- # FIXME: ECOSYSTEM does not imply PyPI!!!!
256+ for version in extract_fixed_versions ( fixed_range ):
232257 if fixed_range_type == "ECOSYSTEM" :
233258 try :
234- fixed_versions .append (PypiVersion (version ))
259+ if not version_class :
260+ raise InvalidVersion (
261+ f"Unsupported version for ecosystem: { supported_ecosystem } "
262+ )
263+ fixed_versions .append (version_class (version ))
235264 except InvalidVersion :
236- logger .error (f"Invalid PypiVersion: { version !r} for OSV id: { raw_id !r} " )
265+ logger .error (
266+ f"Invalid version class: { version_class } - { version !r} for OSV id: { raw_id !r} "
267+ )
237268
238269 elif fixed_range_type == "SEMVER" :
239270 try :
240271 fixed_versions .append (SemverVersion (version ))
241272 except InvalidVersion :
242273 logger .error (f"Invalid SemverVersion: { version !r} for OSV id: { raw_id !r} " )
243-
244274 else :
245275 logger .error (f"Unsupported fixed version type: { version !r} for OSV id: { raw_id !r} " )
246276
0 commit comments