diff --git a/src/seclab_taskflows/mcp_servers/ghsa.py b/src/seclab_taskflows/mcp_servers/ghsa.py index 4611c71..b9235ac 100644 --- a/src/seclab_taskflows/mcp_servers/ghsa.py +++ b/src/seclab_taskflows/mcp_servers/ghsa.py @@ -1,3 +1,6 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + import logging from fastmcp import FastMCP @@ -6,7 +9,12 @@ import json from urllib.parse import urlparse, parse_qs from .gh_code_scanning import call_api -from seclab_taskflow_agent.path_utils import log_file_name +from seclab_taskflow_agent.path_utils import mcp_data_dir, log_file_name +from .ghsa_models import GHSA, GHSASummary, Base +from pathlib import Path +from sqlalchemy import create_engine +from sqlalchemy.orm import Session +from .utils import process_repo logging.basicConfig( level=logging.DEBUG, @@ -17,17 +25,150 @@ mcp = FastMCP("GitHubRepoAdvisories") +MEMORY = mcp_data_dir("seclab-taskflows", "ghsa", "GHSA_DIR") + + +def ghsa_to_dict(result): + return { + "id": result.id, + "ghsa_id": result.ghsa_id, + "repo": result.repo.lower(), + "severity": result.severity, + "cve_id": result.cve_id, + "description": result.description, + "summary": result.summary, + "published_at": result.published_at, + "state": result.state, + } + + +def ghsa_summary_to_dict(summary): + return { + "id": summary.id, + "repo": summary.repo.lower(), + "total_advisories": summary.total_advisories, + "high_severity_count": summary.high_severity_count, + "medium_severity_count": summary.medium_severity_count, + "low_severity_count": summary.low_severity_count, + "summary_notes": summary.summary_notes, + } + +class GHSABackend: + def __init__(self, db_dir: str): + # Directory in which the GHSA SQLite database file will be stored. + self.db_dir = db_dir + db_uri = "sqlite://" if not Path(self.db_dir).exists() else f"sqlite:///{self.db_dir}/ghsa.db" + self.engine = create_engine(db_uri, echo=False) + Base.metadata.create_all( + self.engine, + tables=[ + GHSA.__table__, + GHSASummary.__table__, + ], + ) + + def store_new_ghsa(self, repo, ghsa_id, severity, cve_id, description, summary, published_at, state): + with Session(self.engine) as session: + existing = session.query(GHSA).filter_by(repo=repo, ghsa_id=ghsa_id).first() + if existing: + if severity: + existing.severity = severity + if cve_id: + existing.cve_id = cve_id + if description: + existing.description = description + if summary: + existing.summary = summary + if published_at: + existing.published_at = published_at + if state: + existing.state = state + else: + new_ghsa = GHSA( + repo=repo, + ghsa_id=ghsa_id, + severity=severity, + cve_id=cve_id, + description=description, + summary=summary, + published_at=published_at, + state=state, + ) + session.add(new_ghsa) + session.commit() + return f"Updated or added GHSA {ghsa_id} for {repo}" + + def get_ghsa(self, repo, ghsa_id): + with Session(self.engine) as session: + existing = session.query(GHSA).filter_by(repo=repo, ghsa_id=ghsa_id).first() + if not existing: + return None + return ghsa_to_dict(existing) + + def get_ghsas(self, repo): + with Session(self.engine) as session: + existing = session.query(GHSA).filter_by(repo=repo).all() + return [ghsa_to_dict(ghsa) for ghsa in existing] + + def store_new_ghsa_summary( + self, + repo, + total_advisories, + high_severity_count, + medium_severity_count, + low_severity_count, + summary_notes, + ): + with Session(self.engine) as session: + existing = session.query(GHSASummary).filter_by(repo=repo).first() + if existing: + existing.total_advisories = total_advisories + existing.high_severity_count = high_severity_count + existing.medium_severity_count = medium_severity_count + existing.low_severity_count = low_severity_count + existing.summary_notes = summary_notes + else: + new_summary = GHSASummary( + repo=repo, + total_advisories=total_advisories, + high_severity_count=high_severity_count, + medium_severity_count=medium_severity_count, + low_severity_count=low_severity_count, + summary_notes=summary_notes, + ) + session.add(new_summary) + session.commit() + return f"Updated or added GHSA summary for {repo}" + + def get_ghsa_summary(self, repo): + with Session(self.engine) as session: + existing = session.query(GHSASummary).filter_by(repo=repo).first() + if not existing: + return None + return ghsa_summary_to_dict(existing) + + def clear_repo(self, repo): + with Session(self.engine) as session: + session.query(GHSA).filter_by(repo=repo).delete() + session.query(GHSASummary).filter_by(repo=repo).delete() + session.commit() + return f"Cleared GHSA results for repo {repo}" + + +backend = GHSABackend(MEMORY) # The advisories contain a lot of information, so we need to filter # some of it out to avoid exceeding the maximum prompt size. def parse_advisory(advisory: dict) -> dict: logging.debug(f"advisory: {advisory}") return { - "ghsa_id": advisory.get("ghsa_id", ""), - "cve_id": advisory.get("cve_id", ""), - "summary": advisory.get("summary", ""), - "published_at": advisory.get("published_at", ""), - "state": advisory.get("state", ""), + "ghsa_id": advisory.get("ghsa_id") or "", + "cve_id": advisory.get("cve_id") or "", + "summary": advisory.get("summary") or "", + "description": advisory.get("description") or "", + "severity": advisory.get("severity") or "", + "published_at": advisory.get("published_at") or "", + "state": advisory.get("state") or "", } @@ -70,6 +211,132 @@ async def fetch_GHSA_list( return results return json.dumps(results, indent=2) +@mcp.tool() +async def fetch_and_store_GHSA_list( + owner: str = Field(description="The owner of the repo"), repo: str = Field(description="The repository name"), + return_results: bool = Field(description="Whether to return the fetched results as a JSON string", default=False) +) -> str: + """Fetch all GitHub Security Advisories (GHSAs) for a specific repository and store them in the database.""" + results = await fetch_GHSA_list_from_gh(owner, repo) + if isinstance(results, str): + return results + for advisory in results: + backend.store_new_ghsa( + process_repo(owner, repo), + advisory["ghsa_id"], + advisory["severity"], + advisory["cve_id"], + advisory["description"], + advisory["summary"], + advisory["published_at"], + advisory["state"], + ) + if return_results: + return json.dumps(results, indent=2) + return f"Fetched and stored {len(results)} GHSAs for {owner}/{repo}" + +@mcp.tool() +def store_new_ghsa( + owner: str = Field(description="The owner of the GitHub repository"), + repo: str = Field(description="The name of the GitHub repository"), + ghsa_id: str = Field(description="The GHSA ID of the advisory"), + severity: str = Field(description="The severity of the advisory"), + cve_id: str = Field(description="The CVE ID if available", default=""), + description: str = Field(description="Description for this advisory", default=""), + summary: str = Field(description="Summary for this advisory", default=""), + published_at: str = Field(description="Published timestamp for this advisory", default=""), + state: str = Field(description="State for this advisory (e.g. published, withdrawn)", default=""), +): + """Store a GHSA advisory record in the database.""" + return backend.store_new_ghsa( + process_repo(owner, repo), ghsa_id, severity, cve_id, description, summary, published_at, state + ) + +@mcp.tool() +def get_ghsa_from_db( + owner: str = Field(description="The owner of the GitHub repository"), + repo: str = Field(description="The name of the GitHub repository"), + ghsa_id: str = Field(description="The GHSA ID of the advisory"), +): + """Get a GHSA advisory record from the database.""" + repo_name = process_repo(owner, repo) + result = backend.get_ghsa(repo_name, ghsa_id) + if not result: + return f"Error: No GHSA entry exists in repo: {repo_name} and ghsa_id {ghsa_id}" + return json.dumps(result) + + +@mcp.tool() +def get_ghsas_for_repo_from_db( + owner: str = Field(description="The owner of the GitHub repository"), + repo: str = Field(description="The name of the GitHub repository"), +): + """Get all GHSA advisory records for a repository.""" + return json.dumps(backend.get_ghsas(process_repo(owner, repo))) + +@mcp.tool() +def store_new_ghsa_summary( + owner: str = Field(description="The owner of the GitHub repository"), + repo: str = Field(description="The name of the GitHub repository"), + total_advisories: int = Field(description="Total number of advisories"), + high_severity_count: int = Field(description="Number of high severity advisories"), + medium_severity_count: int = Field(description="Number of medium severity advisories"), + low_severity_count: int = Field(description="Number of low severity advisories"), + summary_notes: str = Field(description="Notes for the advisory summary", default=""), +): + """Store GHSA summary statistics for a repository.""" + return backend.store_new_ghsa_summary( + process_repo(owner, repo), + total_advisories, + high_severity_count, + medium_severity_count, + low_severity_count, + summary_notes, + ) + + +@mcp.tool() +def update_ghsa_summary_notes( + owner: str = Field(description="The owner of the GitHub repository"), + repo: str = Field(description="The name of the GitHub repository"), + summary_notes: str = Field(description="New notes for the advisory summary", default=""), +): + """Update summary notes for the GHSA summary for a repository.""" + repo_name = process_repo(owner, repo) + existing = backend.get_ghsa_summary(repo_name) + if not existing: + return f"Error: No GHSA summary exists in repo: {repo_name}" + return backend.store_new_ghsa_summary( + repo_name, + existing["total_advisories"], + existing["high_severity_count"], + existing["medium_severity_count"], + existing["low_severity_count"], + summary_notes, + ) + + +@mcp.tool() +def get_ghsa_summary( + owner: str = Field(description="The owner of the GitHub repository"), + repo: str = Field(description="The name of the GitHub repository"), +): + """Get the GHSA summary for a repository.""" + repo_name = process_repo(owner, repo) + result = backend.get_ghsa_summary(repo_name) + if not result: + return f"Error: No GHSA summary exists in repo: {repo_name}" + return json.dumps(result) + + +@mcp.tool() +def clear_repo( + owner: str = Field(description="The owner of the GitHub repository"), + repo: str = Field(description="The name of the GitHub repository"), +): + """Clear GHSA and GHSA summary records for a repository.""" + return backend.clear_repo(process_repo(owner, repo)) + async def fetch_GHSA_details_from_gh(owner: str, repo: str, ghsa_id: str) -> str | dict: """Fetch the details of a repository security advisory.""" diff --git a/src/seclab_taskflows/mcp_servers/ghsa_models.py b/src/seclab_taskflows/mcp_servers/ghsa_models.py new file mode 100644 index 0000000..4a1cfe2 --- /dev/null +++ b/src/seclab_taskflows/mcp_servers/ghsa_models.py @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +from sqlalchemy import Text +from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped +from typing import Optional + + +class Base(DeclarativeBase): + pass + +class GHSA(Base): + __tablename__ = "ghsa" + + id: Mapped[int] = mapped_column(primary_key=True) + ghsa_id: Mapped[str] + repo: Mapped[str] + severity: Mapped[str] + cve_id: Mapped[Optional[str]] = mapped_column(nullable=True) + description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + summary: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + published_at: Mapped[Optional[str]] = mapped_column(nullable=True) + state: Mapped[Optional[str]] = mapped_column(nullable=True) + + def __repr__(self): + return ( + f"" + ) + +class GHSASummary(Base): + __tablename__ = "ghsa_summary" + + id: Mapped[int] = mapped_column(primary_key=True) + repo: Mapped[str] + total_advisories: Mapped[int] + high_severity_count: Mapped[int] + medium_severity_count: Mapped[int] + low_severity_count: Mapped[int] + summary_notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + + def __repr__(self): + return ( + f"" + ) diff --git a/src/seclab_taskflows/prompts/audit/known_security_advisories.yaml b/src/seclab_taskflows/prompts/audit/known_security_advisories.yaml index 6c5cb11..103dcc9 100644 --- a/src/seclab_taskflows/prompts/audit/known_security_advisories.yaml +++ b/src/seclab_taskflows/prompts/audit/known_security_advisories.yaml @@ -7,5 +7,8 @@ seclab-taskflow-agent: prompt: | ## Known Security Advisories for this Repository - Fetch the security advisories for {{ globals.repo }} from memcache (stored under the key 'security_advisories_{{ globals.repo }}'). If the value in the memcache is null or an error message, clearly state that no advisories are available and skip advisory analysis. Otherwise, state how many advisories were found. + Fetch the security advisories for {{ globals.repo }} from the GHSASummary and GHSA entries + stored in the database. Do not fetch them from GitHub directly. + If the value in the database is null or an error message, clearly state that no advisories are available and skip advisory analysis. + Otherwise, state how many advisories were found. Review these advisories and consider them when identifying security risks. If you identify code that is similar to a known advisory pattern, highlight that connection. diff --git a/src/seclab_taskflows/taskflows/audit/audit_issue_local_iter.yaml b/src/seclab_taskflows/taskflows/audit/audit_issue_local_iter.yaml index 5e197ff..16ca7aa 100644 --- a/src/seclab_taskflows/taskflows/audit/audit_issue_local_iter.yaml +++ b/src/seclab_taskflows/taskflows/audit/audit_issue_local_iter.yaml @@ -50,4 +50,4 @@ taskflow: toolboxes: - seclab_taskflows.toolboxes.repo_context - seclab_taskflows.toolboxes.local_file_viewer - - seclab_taskflow_agent.toolboxes.memcache + - seclab_taskflows.toolboxes.ghsa \ No newline at end of file diff --git a/src/seclab_taskflows/taskflows/audit/classify_application_local.yaml b/src/seclab_taskflows/taskflows/audit/classify_application_local.yaml index 9f44ccc..aa9c926 100644 --- a/src/seclab_taskflows/taskflows/audit/classify_application_local.yaml +++ b/src/seclab_taskflows/taskflows/audit/classify_application_local.yaml @@ -90,4 +90,4 @@ taskflow: toolboxes: - seclab_taskflows.toolboxes.repo_context - seclab_taskflows.toolboxes.local_file_viewer - - seclab_taskflow_agent.toolboxes.memcache + - seclab_taskflows.toolboxes.ghsa \ No newline at end of file diff --git a/src/seclab_taskflows/taskflows/audit/fetch_security_advisories.yaml b/src/seclab_taskflows/taskflows/audit/fetch_security_advisories.yaml index ce7afcb..c6234ff 100644 --- a/src/seclab_taskflows/taskflows/audit/fetch_security_advisories.yaml +++ b/src/seclab_taskflows/taskflows/audit/fetch_security_advisories.yaml @@ -19,18 +19,15 @@ taskflow: - seclab_taskflow_agent.personalities.assistant model: general_tasks user_prompt: | - Fetch all GitHub Security Advisories (GHSAs) for the repo {{ globals.repo }}. + Fetch and store all GitHub Security Advisories (GHSAs) for the repo {{ globals.repo }} + with the return_results option set to true. - If an error occurs during fetching, store the error message in memcache under the key 'security_advisories_{{ globals.repo }}'. + If an error occurs during fetching, store a GHSASummary with the error message. Ensure the error message starts with "Error:" followed by a description of the error. - If fetching is successful, store the list of advisories in memcache under the key 'security_advisories_{{ globals.repo }}'. - - If one or more advisories are found, provide a summary of the findings including: - 1. How many advisories were found - 2. The severity levels of the advisories + If one or more advisories are found, provide a summary of the findings including. + The ghsa_id of each advisory, followed by the severity and type of vulnerability, together with the location of the vulnerable code if available. + and store it in a GHSASummary entry in the database. toolboxes: - seclab_taskflows.toolboxes.ghsa - - seclab_taskflow_agent.toolboxes.memcache - seclab_taskflows.toolboxes.local_file_viewer - - seclab_taskflows.toolboxes.gh_file_viewer diff --git a/src/seclab_taskflows/toolboxes/ghsa.yaml b/src/seclab_taskflows/toolboxes/ghsa.yaml index 1b1cd41..099c73e 100644 --- a/src/seclab_taskflows/toolboxes/ghsa.yaml +++ b/src/seclab_taskflows/toolboxes/ghsa.yaml @@ -11,3 +11,4 @@ server_params: env: GH_TOKEN: "{{ env('GH_TOKEN') }}" LOG_DIR: "{{ env('LOG_DIR') }}" + GHSA_DIR: "{{ env('DATA_DIR') }}"