Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 273 additions & 6 deletions src/seclab_taskflows/mcp_servers/ghsa.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# SPDX-FileCopyrightText: GitHub, Inc.
# SPDX-License-Identifier: MIT

import logging

from fastmcp import FastMCP
Expand All @@ -6,7 +9,12 @@
import json
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file appears to lack the standard SPDX copyright/license header used by other modules in src/seclab_taskflows/mcp_servers/. Consider adding the SPDX header above the imports to keep licensing metadata consistent.

Copilot uses AI. Check for mistakes.
from urllib.parse import urlparse, parse_qs
from .gh_code_scanning import call_api
from seclab_taskflow_agent.path_utils import log_file_name
from seclab_taskflow_agent.path_utils import mcp_data_dir, log_file_name
from .ghsa_models import GHSA, GHSASummary, Base
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from .utils import process_repo

logging.basicConfig(
level=logging.DEBUG,
Expand All @@ -17,17 +25,150 @@

mcp = FastMCP("GitHubRepoAdvisories")

MEMORY = mcp_data_dir("seclab-taskflows", "ghsa", "GHSA_DIR")


def ghsa_to_dict(result):
return {
"id": result.id,
"ghsa_id": result.ghsa_id,
"repo": result.repo.lower(),
"severity": result.severity,
"cve_id": result.cve_id,
"description": result.description,
"summary": result.summary,
"published_at": result.published_at,
"state": result.state,
}


def ghsa_summary_to_dict(summary):
return {
"id": summary.id,
"repo": summary.repo.lower(),
"total_advisories": summary.total_advisories,
"high_severity_count": summary.high_severity_count,
"medium_severity_count": summary.medium_severity_count,
"low_severity_count": summary.low_severity_count,
"summary_notes": summary.summary_notes,
}

class GHSABackend:
def __init__(self, db_dir: str):
# Directory in which the GHSA SQLite database file will be stored.
self.db_dir = db_dir
db_uri = "sqlite://" if not Path(self.db_dir).exists() else f"sqlite:///{self.db_dir}/ghsa.db"
self.engine = create_engine(db_uri, echo=False)
Base.metadata.create_all(
self.engine,
tables=[
GHSA.__table__,
GHSASummary.__table__,
],
)

def store_new_ghsa(self, repo, ghsa_id, severity, cve_id, description, summary, published_at, state):
with Session(self.engine) as session:
existing = session.query(GHSA).filter_by(repo=repo, ghsa_id=ghsa_id).first()
if existing:
if severity:
existing.severity = severity
if cve_id:
existing.cve_id = cve_id
if description:
existing.description = description
if summary:
existing.summary = summary
if published_at:
existing.published_at = published_at
if state:
existing.state = state
else:
new_ghsa = GHSA(
repo=repo,
ghsa_id=ghsa_id,
severity=severity,
cve_id=cve_id,
description=description,
summary=summary,
published_at=published_at,
state=state,
)
session.add(new_ghsa)
session.commit()
return f"Updated or added GHSA {ghsa_id} for {repo}"

def get_ghsa(self, repo, ghsa_id):
with Session(self.engine) as session:
existing = session.query(GHSA).filter_by(repo=repo, ghsa_id=ghsa_id).first()
if not existing:
return None
return ghsa_to_dict(existing)

def get_ghsas(self, repo):
with Session(self.engine) as session:
existing = session.query(GHSA).filter_by(repo=repo).all()
return [ghsa_to_dict(ghsa) for ghsa in existing]

def store_new_ghsa_summary(
self,
repo,
total_advisories,
high_severity_count,
medium_severity_count,
low_severity_count,
summary_notes,
):
with Session(self.engine) as session:
existing = session.query(GHSASummary).filter_by(repo=repo).first()
if existing:
existing.total_advisories = total_advisories
existing.high_severity_count = high_severity_count
existing.medium_severity_count = medium_severity_count
existing.low_severity_count = low_severity_count
existing.summary_notes = summary_notes
else:
new_summary = GHSASummary(
repo=repo,
total_advisories=total_advisories,
high_severity_count=high_severity_count,
medium_severity_count=medium_severity_count,
low_severity_count=low_severity_count,
summary_notes=summary_notes,
)
session.add(new_summary)
session.commit()
return f"Updated or added GHSA summary for {repo}"

def get_ghsa_summary(self, repo):
with Session(self.engine) as session:
existing = session.query(GHSASummary).filter_by(repo=repo).first()
if not existing:
return None
return ghsa_summary_to_dict(existing)

def clear_repo(self, repo):
with Session(self.engine) as session:
session.query(GHSA).filter_by(repo=repo).delete()
session.query(GHSASummary).filter_by(repo=repo).delete()
session.commit()
return f"Cleared GHSA results for repo {repo}"


backend = GHSABackend(MEMORY)

# The advisories contain a lot of information, so we need to filter
# some of it out to avoid exceeding the maximum prompt size.
def parse_advisory(advisory: dict) -> dict:
logging.debug(f"advisory: {advisory}")
return {
"ghsa_id": advisory.get("ghsa_id", ""),
"cve_id": advisory.get("cve_id", ""),
"summary": advisory.get("summary", ""),
"published_at": advisory.get("published_at", ""),
"state": advisory.get("state", ""),
"ghsa_id": advisory.get("ghsa_id") or "",
"cve_id": advisory.get("cve_id") or "",
"summary": advisory.get("summary") or "",
"description": advisory.get("description") or "",
"severity": advisory.get("severity") or "",
"published_at": advisory.get("published_at") or "",
"state": advisory.get("state") or "",
}


Expand Down Expand Up @@ -70,6 +211,132 @@ async def fetch_GHSA_list(
return results
return json.dumps(results, indent=2)

@mcp.tool()
async def fetch_and_store_GHSA_list(
owner: str = Field(description="The owner of the repo"), repo: str = Field(description="The repository name"),
return_results: bool = Field(description="Whether to return the fetched results as a JSON string", default=False)
) -> str:
"""Fetch all GitHub Security Advisories (GHSAs) for a specific repository and store them in the database."""
results = await fetch_GHSA_list_from_gh(owner, repo)
if isinstance(results, str):
return results
Comment on lines +219 to +222
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_and_store_GHSA_list returns raw error strings from fetch_GHSA_list_from_gh (e.g., "Request error: ...") without normalizing them. Since the taskflow prompt requires errors to start with "Error:", consider wrapping/standardizing error returns here (and/or in fetch_GHSA_list_from_gh) so downstream prompts can reliably detect failures.

Copilot uses AI. Check for mistakes.
for advisory in results:
Comment on lines +219 to +223
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_and_store_GHSA_list returns raw error strings from fetch_GHSA_list_from_gh unchanged. Those errors currently come back as e.g. "Request error: ..." / "HTTP error: ..." (no leading "Error:"), which conflicts with the updated taskflow prompt that requires error messages to start with "Error:". Consider normalizing error returns here (and/or in fetch_GHSA_list_from_gh) to consistently prefix errors with "Error:".

Copilot uses AI. Check for mistakes.
backend.store_new_ghsa(
process_repo(owner, repo),
advisory["ghsa_id"],
advisory["severity"],
advisory["cve_id"],
advisory["description"],
advisory["summary"],
advisory["published_at"],
advisory["state"],
)
Comment on lines +223 to +233
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_and_store_GHSA_list calls backend.store_new_ghsa in a loop, and store_new_ghsa opens a new SQLAlchemy Session and commits per advisory. This is likely to be slow for repos with many advisories. Prefer doing the upsert work in a single session/transaction for the whole batch.

Copilot uses AI. Check for mistakes.
Comment on lines +223 to +233
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_and_store_GHSA_list stores each advisory by calling backend.store_new_ghsa(...) inside a loop, which opens a new Session and commits for every row. This is unnecessarily expensive for large advisory lists; prefer using a single Session/transaction for the whole batch (and commit once), or add a backend method that upserts the whole list in one go.

Suggested change
for advisory in results:
backend.store_new_ghsa(
process_repo(owner, repo),
advisory["ghsa_id"],
advisory["severity"],
advisory["cve_id"],
advisory["description"],
advisory["summary"],
advisory["published_at"],
advisory["state"],
)
# Use a single Session/transaction for the whole batch to avoid per-row session/commit overhead.
repo_identifier = process_repo(owner, repo)
with Session(bind=engine) as session:
for advisory in results:
ghsa_obj = GHSA(
repo=repo_identifier,
ghsa_id=advisory["ghsa_id"],
severity=advisory["severity"],
cve_id=advisory["cve_id"],
description=advisory["description"],
summary=advisory["summary"],
published_at=advisory["published_at"],
state=advisory["state"],
)
# merge provides upsert-like behaviour if a matching row already exists
session.merge(ghsa_obj)
session.commit()

Copilot uses AI. Check for mistakes.
if return_results:
return json.dumps(results, indent=2)
return f"Fetched and stored {len(results)} GHSAs for {owner}/{repo}"
Comment on lines +219 to +236
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_and_store_GHSA_list stores new GHSA rows but never clears existing rows for the repo. If the repo’s advisories are removed/withdrawn, or if a fetch returns "No advisories found." / an error, stale advisories (and any previous summary) will remain in the DB and be treated as current by downstream prompts. Consider clearing existing GHSA/GHSASummary for the repo at the start of this tool (or explicitly handling the no-advisories/error cases by clearing/updating the summary).

Copilot uses AI. Check for mistakes.

@mcp.tool()
def store_new_ghsa(
owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
ghsa_id: str = Field(description="The GHSA ID of the advisory"),
severity: str = Field(description="The severity of the advisory"),
cve_id: str = Field(description="The CVE ID if available", default=""),
description: str = Field(description="Description for this advisory", default=""),
summary: str = Field(description="Summary for this advisory", default=""),
published_at: str = Field(description="Published timestamp for this advisory", default=""),
state: str = Field(description="State for this advisory (e.g. published, withdrawn)", default=""),
):
"""Store a GHSA advisory record in the database."""
return backend.store_new_ghsa(
process_repo(owner, repo), ghsa_id, severity, cve_id, description, summary, published_at, state
)

@mcp.tool()
def get_ghsa_from_db(
owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
ghsa_id: str = Field(description="The GHSA ID of the advisory"),
):
"""Get a GHSA advisory record from the database."""
repo_name = process_repo(owner, repo)
result = backend.get_ghsa(repo_name, ghsa_id)
if not result:
return f"Error: No GHSA entry exists in repo: {repo_name} and ghsa_id {ghsa_id}"
return json.dumps(result)


@mcp.tool()
def get_ghsas_for_repo_from_db(
owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
):
"""Get all GHSA advisory records for a repository."""
return json.dumps(backend.get_ghsas(process_repo(owner, repo)))

@mcp.tool()
def store_new_ghsa_summary(
Comment on lines +274 to +278
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_ghsa_with_id_from_db appears to be functionally identical to get_ghsa_from_db (same parameters and behavior). Consider removing one, or making one an alias/wrapper, to avoid divergence and extra maintenance.

Copilot uses AI. Check for mistakes.
owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
total_advisories: int = Field(description="Total number of advisories"),
high_severity_count: int = Field(description="Number of high severity advisories"),
medium_severity_count: int = Field(description="Number of medium severity advisories"),
low_severity_count: int = Field(description="Number of low severity advisories"),
summary_notes: str = Field(description="Notes for the advisory summary", default=""),
):
"""Store GHSA summary statistics for a repository."""
return backend.store_new_ghsa_summary(
process_repo(owner, repo),
total_advisories,
high_severity_count,
medium_severity_count,
low_severity_count,
summary_notes,
)


@mcp.tool()
def update_ghsa_summary_notes(
owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
summary_notes: str = Field(description="New notes for the advisory summary", default=""),
):
"""Update summary notes for the GHSA summary for a repository."""
repo_name = process_repo(owner, repo)
existing = backend.get_ghsa_summary(repo_name)
if not existing:
return f"Error: No GHSA summary exists in repo: {repo_name}"
return backend.store_new_ghsa_summary(
repo_name,
existing["total_advisories"],
existing["high_severity_count"],
existing["medium_severity_count"],
existing["low_severity_count"],
summary_notes,
Comment on lines +309 to +315
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_ghsa_summary_notes is documented/named as an append operation, but it overwrites summary_notes by calling store_new_ghsa_summary(..., summary_notes) without combining with the existing notes. This makes it impossible to accumulate notes and is inconsistent with the tool name/docstring. Consider appending to existing["summary_notes"] (with a separator) rather than replacing it.

Suggested change
return backend.store_new_ghsa_summary(
repo_name,
existing["total_advisories"],
existing["high_severity_count"],
existing["medium_severity_count"],
existing["low_severity_count"],
summary_notes,
# Combine existing summary notes with the new notes, if any.
existing_notes = existing.get("summary_notes") or ""
if summary_notes:
if existing_notes:
combined_notes = existing_notes + "\n" + summary_notes
else:
combined_notes = summary_notes
else:
combined_notes = existing_notes
return backend.store_new_ghsa_summary(
repo_name,
existing["total_advisories"],
existing["high_severity_count"],
existing["medium_severity_count"],
existing["low_severity_count"],
combined_notes,

Copilot uses AI. Check for mistakes.
Comment on lines +309 to +315
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_ghsa_summary_notes is documented/named as appending notes, but it overwrites summary_notes by passing only the new notes into store_new_ghsa_summary. Either actually append to the existing summary_notes value (e.g., concatenate with a separator) or rename/update the docstring to reflect overwrite behavior.

Suggested change
return backend.store_new_ghsa_summary(
repo_name,
existing["total_advisories"],
existing["high_severity_count"],
existing["medium_severity_count"],
existing["low_severity_count"],
summary_notes,
existing_notes = existing.get("summary_notes") or ""
new_notes = summary_notes or ""
if existing_notes and new_notes:
combined_notes = existing_notes.rstrip() + "\n\n" + new_notes.lstrip()
elif new_notes:
combined_notes = new_notes
else:
combined_notes = existing_notes
return backend.store_new_ghsa_summary(
repo_name,
existing["total_advisories"],
existing["high_severity_count"],
existing["medium_severity_count"],
existing["low_severity_count"],
combined_notes,

Copilot uses AI. Check for mistakes.
)


@mcp.tool()
def get_ghsa_summary(
owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
):
"""Get the GHSA summary for a repository."""
repo_name = process_repo(owner, repo)
result = backend.get_ghsa_summary(repo_name)
if not result:
return f"Error: No GHSA summary exists in repo: {repo_name}"
return json.dumps(result)


@mcp.tool()
def clear_repo(
owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
):
"""Clear GHSA and GHSA summary records for a repository."""
return backend.clear_repo(process_repo(owner, repo))


async def fetch_GHSA_details_from_gh(owner: str, repo: str, ghsa_id: str) -> str | dict:
"""Fetch the details of a repository security advisory."""
Expand Down
48 changes: 48 additions & 0 deletions src/seclab_taskflows/mcp_servers/ghsa_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# SPDX-FileCopyrightText: GitHub, Inc.
# SPDX-License-Identifier: MIT

from sqlalchemy import Text
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from typing import Optional


class Base(DeclarativeBase):
pass

class GHSA(Base):
__tablename__ = "ghsa"

id: Mapped[int] = mapped_column(primary_key=True)
ghsa_id: Mapped[str]
repo: Mapped[str]
severity: Mapped[str]
cve_id: Mapped[Optional[str]] = mapped_column(nullable=True)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
summary: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
published_at: Mapped[Optional[str]] = mapped_column(nullable=True)
state: Mapped[Optional[str]] = mapped_column(nullable=True)

def __repr__(self):
return (
f"<GHSA(id={self.id}, ghsa_id={self.ghsa_id}, repo={self.repo}, "
f"severity={self.severity}, cve_id={self.cve_id}, description={self.description}, summary={self.summary}, "
f"published_at={self.published_at}, state={self.state})>"
)

class GHSASummary(Base):
__tablename__ = "ghsa_summary"
Comment on lines +9 to +33
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ghsa_models.py is missing the required two blank lines between top-level class definitions (e.g., between Base and GHSA, and between GHSA and GHSASummary). This will likely fail Ruff/pycodestyle (E302). Add an extra blank line before each top-level class (except the first one).

Copilot uses AI. Check for mistakes.

id: Mapped[int] = mapped_column(primary_key=True)
repo: Mapped[str]
total_advisories: Mapped[int]
high_severity_count: Mapped[int]
medium_severity_count: Mapped[int]
low_severity_count: Mapped[int]
summary_notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

def __repr__(self):
return (
f"<GHSASummary(id={self.id}, repo={self.repo}, total_advisories={self.total_advisories}, "
f"high_severity_count={self.high_severity_count}, medium_severity_count={self.medium_severity_count}, "
f"low_severity_count={self.low_severity_count}, summary_notes={self.summary_notes})>"
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ seclab-taskflow-agent:
prompt: |
## Known Security Advisories for this Repository

Fetch the security advisories for {{ globals.repo }} from memcache (stored under the key 'security_advisories_{{ globals.repo }}'). If the value in the memcache is null or an error message, clearly state that no advisories are available and skip advisory analysis. Otherwise, state how many advisories were found.
Fetch the security advisories for {{ globals.repo }} from the GHSASummary and GHSA entries
stored in the database. Do not fetch them from GitHub directly.
If the value in the database is null or an error message, clearly state that no advisories are available and skip advisory analysis.
Otherwise, state how many advisories were found.
Review these advisories and consider them when identifying security risks. If you identify code that is similar to a known advisory pattern, highlight that connection.
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ taskflow:
toolboxes:
- seclab_taskflows.toolboxes.repo_context
- seclab_taskflows.toolboxes.local_file_viewer
- seclab_taskflow_agent.toolboxes.memcache
- seclab_taskflows.toolboxes.ghsa
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ taskflow:
toolboxes:
- seclab_taskflows.toolboxes.repo_context
- seclab_taskflows.toolboxes.local_file_viewer
- seclab_taskflow_agent.toolboxes.memcache
- seclab_taskflows.toolboxes.ghsa
Loading
Loading