Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions vulnerabilities/api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@
from vulnerabilities.models import CodeFixV2
from vulnerabilities.models import ImpactedPackage
from vulnerabilities.models import Package
from vulnerabilities.models import PackageCommitPatch
from vulnerabilities.models import PackageV2
from vulnerabilities.models import Patch
from vulnerabilities.models import PipelineRun
from vulnerabilities.models import PipelineSchedule
from vulnerabilities.models import Vulnerability
from vulnerabilities.models import VulnerabilityReference
from vulnerabilities.models import VulnerabilitySeverity
from vulnerabilities.models import Weakness
from vulnerabilities.throttling import PermissionBasedUserRateThrottle
from vulnerabilities.utils import get_patch_url
from vulnerabilities.utils import group_advisories_by_content


Expand Down Expand Up @@ -333,20 +336,49 @@ def get_fixing_vulnerabilities(self, obj):
return [vuln.vulnerability_id for vuln in obj.fixing_vulnerabilities.all()]


class PackageCommitPatchSerializer(serializers.ModelSerializer):
patch_url = serializers.SerializerMethodField()

class Meta:
model = PackageCommitPatch
fields = [
"id",
"commit_hash",
"vcs_url",
"patch_url",
]

def get_patch_url(self, obj):
return get_patch_url(obj.vcs_url, obj.commit_hash)


class PatchSerializer(serializers.ModelSerializer):
class Meta:
model = Patch
fields = [
"id",
"patch_url",
]


class PackageV3Serializer(serializers.ModelSerializer):
purl = serializers.CharField(source="package_url")
risk_score = serializers.FloatField(read_only=True)
affected_by_vulnerabilities = serializers.SerializerMethodField()
fixing_vulnerabilities = serializers.SerializerMethodField()
next_non_vulnerable_version = serializers.SerializerMethodField()
latest_non_vulnerable_version = serializers.SerializerMethodField()
introduced_by_package_commit_patches = serializers.SerializerMethodField()
fixed_by_package_commit_patches = serializers.SerializerMethodField()

class Meta:
model = Package
fields = [
"purl",
"affected_by_vulnerabilities",
"fixing_vulnerabilities",
"introduced_by_package_commit_patches",
"fixed_by_package_commit_patches",
"next_non_vulnerable_version",
"latest_non_vulnerable_version",
"risk_score",
Expand Down Expand Up @@ -425,6 +457,98 @@ def get_fixing_vulnerabilities(self, package):

return result

def get_introduced_by_package_commit_patches(self, package):
impacts = package.affected_in_impacts.select_related("advisory").prefetch_related(
"introduced_by_package_commit_patches"
)

avids = {impact.advisory.avid for impact in impacts if impact.advisory_id}
if not avids:
return []

latest_advisories = AdvisoryV2.objects.latest_for_avids(avids)
advisory_by_avid = {adv.avid: adv for adv in latest_advisories}
impact_by_avid = {}

advisories = []
for impact in impacts:
avid = impact.advisory.avid
advisory = advisory_by_avid.get(avid)
if not advisory:
continue
advisories.append(advisory)
impact_by_avid[avid] = impact

grouped_advisories = group_advisories_by_content(advisories=advisories)

result = []
for advisory_group in grouped_advisories.values():
primary_advisory = advisory_group["primary"]
avid = primary_advisory.avid
impact = impact_by_avid.get(avid)

if not impact:
continue

patches = impact.introduced_by_package_commit_patches.all()
if not patches:
continue

result.append(
{
"advisory_id": primary_advisory.avid,
"duplicate_advisory_ids": [adv.avid for adv in advisory_group["secondary"]],
"commit_patches": [patch.to_dict() for patch in patches],
}
)

return result

def get_fixed_by_package_commit_patches(self, package):
impacts = package.affected_in_impacts.select_related("advisory").prefetch_related(
"fixed_by_package_commit_patches"
)

avids = {impact.advisory.avid for impact in impacts if impact.advisory_id}
if not avids:
return []

latest_advisories = AdvisoryV2.objects.latest_for_avids(avids)
advisory_by_avid = {adv.avid: adv for adv in latest_advisories}
impact_by_avid = {}

advisories = []
for impact in impacts:
avid = impact.advisory.avid
if advisory := advisory_by_avid.get(avid):
advisories.append(advisory)
impact_by_avid[avid] = impact

grouped_advisories = group_advisories_by_content(advisories=advisories)

result = []
for advisory_group in grouped_advisories.values():
primary_advisory = advisory_group["primary"]
impact = impact_by_avid.get(primary_advisory.avid)

if not impact:
continue

# Query the fixing patches instead
patches = impact.fixed_by_package_commit_patches.all()
if not patches:
continue

result.append(
{
"advisory_id": primary_advisory.avid,
"duplicate_advisory_ids": [adv.avid for adv in advisory_group["secondary"]],
"commit_patches": [patch.to_dict() for patch in patches],
}
)

return result

def get_next_non_vulnerable_version(self, package):
if next_non_vulnerable := package.get_non_vulnerable_versions()[0]:
return next_non_vulnerable.version
Expand Down Expand Up @@ -889,6 +1013,40 @@ def get_queryset(self):
return queryset


class PackageCommitPatchViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows viewing PackageCommitPatch entries.
"""

queryset = PackageCommitPatch.objects.all()
serializer_class = PackageCommitPatchSerializer
throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle]

def get_queryset(self):
queryset = PackageCommitPatch.objects.all()
pk = self.request.query_params.get("id")
if pk:
queryset = queryset.filter(id=pk)
return queryset


class PatchViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows viewing PackageCommitPatch entries.
"""

queryset = Patch.objects.all()
serializer_class = PatchSerializer
throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle]

def get_queryset(self):
queryset = Patch.objects.all()
pk = self.request.query_params.get("id")
if pk:
queryset = queryset.filter(id=pk)
return queryset


class CodeFixV2ViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows viewing CodeFix entries.
Expand Down
17 changes: 17 additions & 0 deletions vulnerabilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from cwe2.database import InvalidCWEError
from packageurl import PackageURL
from packageurl.contrib.django.utils import without_empty_values
from packageurl.contrib.purl2url import purl2url
from packageurl.contrib.url2purl import url2purl
from univers.version_range import RANGE_CLASS_BY_SCHEMES
from univers.version_range import AlpineLinuxVersionRange
from univers.version_range import NginxVersionRange
Expand Down Expand Up @@ -888,3 +890,18 @@ def group_advisories_by_content(advisories):
entry["secondary"].add(advisory)

return grouped


def get_patch_url(vcs_url, commit_hash):
"""
Generate patch URL from VCS URL and commit hash.
"""
if vcs_url.startswith("https://github.com"):
return f"{vcs_url}/commit/{commit_hash}.patch"
elif vcs_url.startswith("https://gitlab.com"):
return f"{vcs_url}/-/commit/{commit_hash}.patch"
elif vcs_url.startswith("https://bitbucket.org"):
return f"{vcs_url}/-/commit/{commit_hash}/raw"
elif vcs_url.startswith("https://git.kernel.org"):
return f"{vcs_url}.git/patch/?id={commit_hash}"
return
7 changes: 7 additions & 0 deletions vulnerablecode/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
from vulnerabilities.api import VulnerabilityViewSet
from vulnerabilities.api_v2 import CodeFixV2ViewSet
from vulnerabilities.api_v2 import CodeFixViewSet
from vulnerabilities.api_v2 import PackageCommitPatchViewSet
from vulnerabilities.api_v2 import PackageV2ViewSet
from vulnerabilities.api_v2 import PackageV3ViewSet
from vulnerabilities.api_v2 import PatchViewSet
from vulnerabilities.api_v2 import PipelineScheduleV2ViewSet
from vulnerabilities.api_v2 import VulnerabilityV2ViewSet
from vulnerabilities.views import AdminLoginView
Expand Down Expand Up @@ -71,6 +73,11 @@ def __init__(self, *args, **kwargs):

api_v3_router.register("packages", PackageV3ViewSet, basename="package-v3")

api_v3_router.register(
"package_commit_patches", PackageCommitPatchViewSet, basename="package_commit_patch"
)
api_v3_router.register("patches", PatchViewSet, basename="patches")

urlpatterns = [
path("admin/login/", AdminLoginView.as_view(), name="admin-login"),
path("api/v2/", include(api_v2_router.urls)),
Expand Down
Loading