From a5657fa13d776fa39173ec2ce601c0f08b3b0271 Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Wed, 27 May 2026 11:29:18 -0400 Subject: [PATCH] Add Linear connector for web-security --- .../web-security/agents/web-security.md | 1 + capabilities/web-security/capability.yaml | 6 + capabilities/web-security/mcp/linear.py | 284 ++++++++++++++++++ .../web-security/tests/test_linear_mcp.py | 242 +++++++++++++++ 4 files changed, 533 insertions(+) create mode 100644 capabilities/web-security/mcp/linear.py create mode 100644 capabilities/web-security/tests/test_linear_mcp.py diff --git a/capabilities/web-security/agents/web-security.md b/capabilities/web-security/agents/web-security.md index 143a658..af15186 100644 --- a/capabilities/web-security/agents/web-security.md +++ b/capabilities/web-security/agents/web-security.md @@ -100,6 +100,7 @@ You may also have tools from MCP servers. Check your tool schema for what's avai - **agent-browser**: Prefer running the local `agent-browser` CLI directly when it is available on `PATH`; it is the primary browser automation path. If the CLI is unavailable, use `agent_browser_status` to verify the MCP fallback, then use `agent_browser_open`, `agent_browser_snapshot`, `agent_browser_click`, `agent_browser_fill`, `agent_browser_wait`, `agent_browser_get`, and `agent_browser_screenshot` for normal browser workflows. Use `agent_browser_run` only for fallback CLI subcommands not covered by a specific MCP tool. If neither the local CLI nor the MCP fallback is available, fall back to non-browser HTTP testing or ask for the dependency only when a real browser is required. - **protoscope**: Prefer running the local `protoscope` CLI directly when it is available on `PATH`; it is the primary protobuf inspection and assembly path. If the CLI is unavailable, use `protoscope_status` to verify the MCP fallback. Use `protoscope_inspect_file` or `protoscope_inspect_hex` to decode binary protobuf payloads, and `protoscope_assemble_text` or `protoscope_assemble_file` to build binary protobuf bytes from Protoscope text. Use descriptor-set and message-type options when available to improve field names and enum output. - **hackerone**: Query HackerOne programs, scopes, reports, and hacktivity. Run `hackerone_health` first to verify credentials. Use `hackerone_get_program_scope` to enumerate in-scope assets before testing. Use `hackerone_search_hacktivity` to study previously disclosed vulnerabilities in a program. Use `hackerone_submit_report` only after the full reporting pipeline completes (assess_confidence → report-preflight → exploit-verifier → report-writer). Requires `H1_USERNAME` and `H1_API_TOKEN` env vars. +- **linear**: Create internal Linear remediation issues from validated findings. Run `linear_health` first to verify credentials. Use `linear_list_teams` to find the team UUID before creating issues. Use `linear_create_issue` only after the full reporting pipeline completes; include the validated report body, severity/priority mapping, and links to Dreadnode evidence or artifacts. Requires `LINEAR_API_KEY` or `LINEAR_ACCESS_TOKEN`. Scan and tool output is input to your OODA loop, not a deliverable. When a scan completes, orient on the results, prioritize leads by exploitability, load relevant skills, and begin active exploitation immediately. A completed scan is the start of your work, not the end. diff --git a/capabilities/web-security/capability.yaml b/capabilities/web-security/capability.yaml index f0d7149..5c55ff2 100644 --- a/capabilities/web-security/capability.yaml +++ b/capabilities/web-security/capability.yaml @@ -35,6 +35,12 @@ mcp: - "run" - "${CAPABILITY_ROOT}/mcp/hackerone.py" init_timeout: 60 + linear: + command: "uv" + args: + - "run" + - "${CAPABILITY_ROOT}/mcp/linear.py" + init_timeout: 60 agent-browser: command: "uv" args: diff --git a/capabilities/web-security/mcp/linear.py b/capabilities/web-security/mcp/linear.py new file mode 100644 index 0000000..93a3d8a --- /dev/null +++ b/capabilities/web-security/mcp/linear.py @@ -0,0 +1,284 @@ +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "fastmcp>=2.0", +# "httpx>=0.28", +# ] +# /// +"""Linear issue tools for web-security report export. + +Auth: LINEAR_API_KEY for personal API keys or LINEAR_ACCESS_TOKEN for OAuth. +Use these tools only after a web-security finding has passed validation. +""" + +from __future__ import annotations + +import os +from typing import Annotated, Any + +import httpx +from fastmcp import FastMCP + +_DEFAULT_API_URL = "https://api.linear.app/graphql" +MAX_OUTPUT_CHARS = 30_000 + +mcp = FastMCP("linear") + + +class _LinearClient: + """Lazy Linear GraphQL client.""" + + def __init__(self) -> None: + self._client: httpx.AsyncClient | None = None + + def _settings(self) -> tuple[str, str]: + api_url = os.environ.get("LINEAR_API_URL", _DEFAULT_API_URL).strip() + access_token = os.environ.get("LINEAR_ACCESS_TOKEN", "").strip() + api_key = os.environ.get("LINEAR_API_KEY", "").strip() + + if access_token: + return api_url, f"Bearer {access_token}" + if api_key: + return api_url, api_key + raise RuntimeError( + "Linear credentials not configured. " + "Set LINEAR_API_KEY or LINEAR_ACCESS_TOKEN." + ) + + async def get(self) -> httpx.AsyncClient: + if self._client is not None: + return self._client + + api_url, authorization = self._settings() + self._client = httpx.AsyncClient( + base_url=api_url, + timeout=30.0, + follow_redirects=True, + headers={ + "Authorization": authorization, + "Content-Type": "application/json", + }, + ) + return self._client + + async def graphql( + self, + query: str, + variables: dict[str, Any] | None = None, + ) -> dict[str, Any]: + client = await self.get() + resp = await client.post( + "", json={"query": query, "variables": variables or {}} + ) + if resp.status_code >= 400: + raise RuntimeError( + f"Linear GraphQL request failed: HTTP {resp.status_code}: " + f"{resp.text[:1000]}" + ) + + data = resp.json() + errors = data.get("errors") + if errors: + messages = "; ".join(str(error.get("message", error)) for error in errors) + raise RuntimeError(f"Linear GraphQL error: {messages}") + return data.get("data") or {} + + +_linear = _LinearClient() + + +def _truncate(value: str, limit: int = MAX_OUTPUT_CHARS) -> str: + if len(value) <= limit: + return value + return value[:limit] + f"\n... [TRUNCATED: {len(value)} chars total]" + + +def _drop_empty(value: dict[str, Any]) -> dict[str, Any]: + return { + key: item + for key, item in value.items() + if item is not None and item != "" and item != [] and item != {} + } + + +@mcp.tool +async def linear_health() -> str: + """Check Linear API connectivity and show the authenticated viewer.""" + data = await _linear.graphql( + """ + query Viewer { + viewer { + id + name + displayName + email + } + } + """ + ) + viewer = data.get("viewer") or {} + return ( + "Connected to Linear\n" + f" ID: {viewer.get('id', '?')}\n" + f" Name: {viewer.get('displayName') or viewer.get('name', '?')}\n" + f" Email: {viewer.get('email', '?')}" + ) + + +@mcp.tool +async def linear_list_teams( + first: Annotated[int, "Maximum teams to return"] = 50, +) -> str: + """List Linear teams available to the authenticated token.""" + data = await _linear.graphql( + """ + query Teams($first: Int!) { + teams(first: $first) { + nodes { + id + key + name + } + } + } + """, + {"first": min(first, 100)}, + ) + teams = ((data.get("teams") or {}).get("nodes")) or [] + if not teams: + return "No Linear teams found." + + lines = ["Linear teams:"] + for team in teams: + lines.append( + f" {team.get('id', '?')}\t{team.get('key', '?')}\t{team.get('name', '?')}" + ) + return "\n".join(lines) + + +@mcp.tool +async def linear_create_issue( + team_id: Annotated[str, "Linear team UUID"], + title: Annotated[str, "Issue title"], + description: Annotated[str, "Validated finding or report body in Markdown"], + priority: Annotated[ + int | None, + "Optional Linear priority: 0 none, 1 urgent, 2 high, 3 medium, 4 low", + ] = None, + assignee_id: Annotated[str | None, "Optional Linear user UUID"] = None, + project_id: Annotated[str | None, "Optional Linear project UUID"] = None, + state_id: Annotated[str | None, "Optional Linear workflow status UUID"] = None, + label_ids: Annotated[list[str] | None, "Optional Linear label UUIDs"] = None, +) -> str: + """Create a Linear issue from a validated web-security finding.""" + input_data = _drop_empty( + { + "teamId": team_id, + "title": title, + "description": description, + "priority": priority, + "assigneeId": assignee_id, + "projectId": project_id, + "stateId": state_id, + "labelIds": label_ids, + } + ) + + data = await _linear.graphql( + """ + mutation IssueCreate($input: IssueCreateInput!) { + issueCreate(input: $input) { + success + issue { + id + identifier + title + url + } + } + } + """, + {"input": input_data}, + ) + result = data.get("issueCreate") or {} + if not result.get("success"): + raise RuntimeError("Linear issueCreate returned success=false") + + issue = result.get("issue") or {} + return ( + f"Created Linear issue {issue.get('identifier', issue.get('id', '?'))}: " + f"{issue.get('url', '')}" + ).strip() + + +@mcp.tool +async def linear_get_issue( + issue_id: Annotated[str, "Linear issue UUID or identifier, e.g. ENG-123"], +) -> str: + """Get a Linear issue summary and description.""" + data = await _linear.graphql( + """ + query Issue($id: String!) { + issue(id: $id) { + id + identifier + title + url + priority + state { name } + assignee { name } + description + } + } + """, + {"id": issue_id}, + ) + issue = data.get("issue") + if not issue: + raise RuntimeError(f"Linear issue not found: {issue_id}") + + lines = [ + f"{issue.get('identifier', issue.get('id', '?'))}\t" + f"{(issue.get('state') or {}).get('name', '?')}\t" + f"priority={issue.get('priority', '?')}\t" + f"{issue.get('title', '?')}", + f"URL: {issue.get('url', '')}", + f"Assignee: {(issue.get('assignee') or {}).get('name', 'unassigned')}", + "", + "--- Description ---", + issue.get("description") or "", + ] + return _truncate("\n".join(lines)) + + +@mcp.tool +async def linear_add_comment( + issue_id: Annotated[str, "Linear issue UUID or identifier, e.g. ENG-123"], + body: Annotated[str, "Markdown comment body"], +) -> str: + """Add a comment to an existing Linear issue.""" + data = await _linear.graphql( + """ + mutation CommentCreate($input: CommentCreateInput!) { + commentCreate(input: $input) { + success + comment { + id + url + } + } + } + """, + {"input": {"issueId": issue_id, "body": body}}, + ) + result = data.get("commentCreate") or {} + if not result.get("success"): + raise RuntimeError("Linear commentCreate returned success=false") + + comment = result.get("comment") or {} + return f"Added Linear comment {comment.get('id', '?')}: {comment.get('url', '')}".strip() + + +if __name__ == "__main__": + mcp.run() diff --git a/capabilities/web-security/tests/test_linear_mcp.py b/capabilities/web-security/tests/test_linear_mcp.py new file mode 100644 index 0000000..263a34e --- /dev/null +++ b/capabilities/web-security/tests/test_linear_mcp.py @@ -0,0 +1,242 @@ +"""Tests for the Linear MCP server.""" + +from __future__ import annotations + +import importlib.util +import sys +import types +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import httpx +import pytest + + +def _install_fastmcp_stub() -> None: + """Install a minimal fastmcp stub so linear.py can be imported.""" + fastmcp = types.ModuleType("fastmcp") + + class _FastMCP: + def __init__(self, name: str) -> None: + self.name = name + self._tools: dict[str, object] = {} + + def tool(self, fn): + self._tools[fn.__name__] = fn + return fn + + def run(self, **kwargs) -> None: + pass + + setattr(fastmcp, "FastMCP", _FastMCP) + sys.modules["fastmcp"] = fastmcp + + +_install_fastmcp_stub() + +MODULE_PATH = Path(__file__).resolve().parent.parent / "mcp" / "linear.py" +SPEC = importlib.util.spec_from_file_location("linear_mcp", MODULE_PATH) +assert SPEC and SPEC.loader +MODULE = importlib.util.module_from_spec(SPEC) +SPEC.loader.exec_module(MODULE) + +_LinearClient = MODULE._LinearClient + + +def _mock_response( + status_code: int = 200, + json_data: object = None, + text: str | None = None, +) -> httpx.Response: + kwargs: dict = { + "status_code": status_code, + "request": httpx.Request("POST", "https://api.linear.app/graphql"), + } + if json_data is not None: + kwargs["json"] = json_data + elif text is not None: + kwargs["text"] = text + return httpx.Response(**kwargs) + + +class TestLinearClient: + def test_settings_requires_credentials(self) -> None: + client = _LinearClient() + with patch.dict("os.environ", {}, clear=True): + with pytest.raises(RuntimeError, match="LINEAR_API_KEY"): + client._settings() + + def test_settings_prefers_oauth_access_token(self) -> None: + client = _LinearClient() + env = {"LINEAR_API_KEY": "api-key", "LINEAR_ACCESS_TOKEN": "oauth-token"} + with patch.dict("os.environ", env, clear=True): + api_url, authorization = client._settings() + + assert api_url == "https://api.linear.app/graphql" + assert authorization == "Bearer oauth-token" + + def test_settings_uses_personal_api_key_raw_authorization(self) -> None: + client = _LinearClient() + with patch.dict("os.environ", {"LINEAR_API_KEY": "api-key"}, clear=True): + _, authorization = client._settings() + + assert authorization == "api-key" + + @pytest.mark.asyncio + async def test_graphql_raises_for_graphql_errors(self) -> None: + client = _LinearClient() + mock_http = AsyncMock() + mock_http.post.return_value = _mock_response( + json_data={"errors": [{"message": "bad input"}]} + ) + client._client = mock_http + + with pytest.raises(RuntimeError, match="bad input"): + await client.graphql("query Test { viewer { id } }") + + +class TestHelpers: + def test_drop_empty_preserves_false_and_zero(self) -> None: + result = MODULE._drop_empty( + { + "empty": "", + "none": None, + "list": [], + "dict": {}, + "zero": 0, + "false": False, + "value": "x", + } + ) + assert result == {"zero": 0, "false": False, "value": "x"} + + +class TestTools: + @pytest.mark.asyncio + async def test_health_success(self) -> None: + data = { + "viewer": { + "id": "user-1", + "displayName": "Security Bot", + "email": "sec@example.com", + } + } + with patch.object(MODULE._linear, "graphql", return_value=data): + result = await MODULE.linear_health() + + assert "Connected to Linear" in result + assert "user-1" in result + assert "Security Bot" in result + + @pytest.mark.asyncio + async def test_list_teams_formats_rows(self) -> None: + data = { + "teams": { + "nodes": [ + {"id": "team-1", "key": "ENG", "name": "Engineering"}, + {"id": "team-2", "key": "SEC", "name": "Security"}, + ] + } + } + with patch.object(MODULE._linear, "graphql", return_value=data) as graphql: + result = await MODULE.linear_list_teams(first=10) + + assert "team-1\tENG\tEngineering" in result + assert "team-2\tSEC\tSecurity" in result + assert graphql.call_args.args[1] == {"first": 10} + + @pytest.mark.asyncio + async def test_create_issue_sends_expected_input(self) -> None: + data = { + "issueCreate": { + "success": True, + "issue": { + "id": "issue-1", + "identifier": "ENG-123", + "title": "Stored XSS", + "url": "https://linear.app/dreadnode/issue/ENG-123", + }, + } + } + with patch.object(MODULE._linear, "graphql", return_value=data) as graphql: + result = await MODULE.linear_create_issue( + team_id="team-1", + title="Stored XSS", + description="Validated report", + priority=2, + assignee_id="user-1", + project_id="project-1", + state_id="state-1", + label_ids=["label-1"], + ) + + assert result == ( + "Created Linear issue ENG-123: " + "https://linear.app/dreadnode/issue/ENG-123" + ) + variables = graphql.call_args.args[1] + assert variables["input"] == { + "teamId": "team-1", + "title": "Stored XSS", + "description": "Validated report", + "priority": 2, + "assigneeId": "user-1", + "projectId": "project-1", + "stateId": "state-1", + "labelIds": ["label-1"], + } + + @pytest.mark.asyncio + async def test_create_issue_raises_on_unsuccessful_mutation(self) -> None: + with patch.object( + MODULE._linear, + "graphql", + return_value={"issueCreate": {"success": False}}, + ): + with pytest.raises(RuntimeError, match="success=false"): + await MODULE.linear_create_issue( + team_id="team-1", + title="Stored XSS", + description="Validated report", + ) + + @pytest.mark.asyncio + async def test_get_issue_returns_summary(self) -> None: + data = { + "issue": { + "id": "issue-1", + "identifier": "ENG-123", + "title": "Stored XSS", + "url": "https://linear.app/dreadnode/issue/ENG-123", + "priority": 2, + "state": {"name": "Todo"}, + "assignee": {"name": "Security Bot"}, + "description": "Validated report", + } + } + with patch.object(MODULE._linear, "graphql", return_value=data): + result = await MODULE.linear_get_issue("ENG-123") + + assert "ENG-123\tTodo\tpriority=2\tStored XSS" in result + assert "Assignee: Security Bot" in result + assert "Validated report" in result + + @pytest.mark.asyncio + async def test_add_comment_returns_comment_summary(self) -> None: + data = { + "commentCreate": { + "success": True, + "comment": { + "id": "comment-1", + "url": "https://linear.app/comment/comment-1", + }, + } + } + with patch.object(MODULE._linear, "graphql", return_value=data) as graphql: + result = await MODULE.linear_add_comment("ENG-123", "new evidence") + + assert result == ( + "Added Linear comment comment-1: https://linear.app/comment/comment-1" + ) + variables = graphql.call_args.args[1] + assert variables["input"] == {"issueId": "ENG-123", "body": "new evidence"}