Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions codeframe/core/proof/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ def init_proof_tables(workspace: Workspace) -> None:
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS pr_proof_snapshots (
pr_number INTEGER NOT NULL,
workspace_id TEXT NOT NULL,
gates_passed INTEGER NOT NULL,
gates_total INTEGER NOT NULL,
gate_breakdown TEXT NOT NULL,
snapshotted_at TEXT NOT NULL,
PRIMARY KEY (pr_number, workspace_id)
)
""")

conn.commit()
conn.close()

Expand All @@ -101,6 +113,11 @@ def _ensure_tables(workspace: Workspace) -> None:
"SELECT name FROM sqlite_master WHERE type='table' AND name='proof_runs'"
)
missing = not cursor.fetchone()
if not missing:
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='pr_proof_snapshots'"
)
missing = not cursor.fetchone()
conn.close()
if missing:
init_proof_tables(workspace)
Expand Down Expand Up @@ -493,3 +510,65 @@ def check_expired_waivers(workspace: Workspace) -> list[Requirement]:
conn.commit()
conn.close()
return expired


# --- PR Proof Snapshots ---


def save_pr_proof_snapshot(
workspace: Workspace,
pr_number: int,
gates_passed: int,
gates_total: int,
gate_breakdown: list[dict],
) -> None:
"""Save a proof snapshot for a PR at creation time."""
_ensure_tables(workspace)
conn = get_db_connection(workspace)
cursor = conn.cursor()
cursor.execute(
"""INSERT OR REPLACE INTO pr_proof_snapshots
(pr_number, workspace_id, gates_passed, gates_total,
gate_breakdown, snapshotted_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(
pr_number,
workspace.id,
gates_passed,
gates_total,
json.dumps(gate_breakdown),
_utc_now().isoformat(),
),
)
conn.commit()
conn.close()


def get_pr_proof_snapshot(
workspace: Workspace, pr_number: int
) -> Optional[dict]:
"""Fetch a proof snapshot for a PR.

Returns:
Dict with pr_number, gates_passed, gates_total, gate_breakdown,
snapshotted_at — or None if not found.
"""
_ensure_tables(workspace)
conn = get_db_connection(workspace)
cursor = conn.cursor()
cursor.execute(
"""SELECT pr_number, gates_passed, gates_total, gate_breakdown, snapshotted_at
FROM pr_proof_snapshots WHERE pr_number = ? AND workspace_id = ?""",
(pr_number, workspace.id),
)
row = cursor.fetchone()
conn.close()
if not row:
return None
return {
"pr_number": row[0],
"gates_passed": row[1],
"gates_total": row[2],
"gate_breakdown": json.loads(row[3]),
"snapshotted_at": row[4],
}
5 changes: 5 additions & 0 deletions codeframe/git/github_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class PRDetails:
merged_at: Optional[datetime]
head_branch: str
base_branch: str
author: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -220,6 +221,9 @@ def _parse_pr_response(self, data: Dict[str, Any]) -> PRDetails:
data["merged_at"].replace("Z", "+00:00")
)

user = data.get("user")
author = user.get("login") if isinstance(user, dict) else None

return PRDetails(
number=data["number"],
url=data["html_url"],
Expand All @@ -230,6 +234,7 @@ def _parse_pr_response(self, data: Dict[str, Any]) -> PRDetails:
merged_at=merged_at,
head_branch=data["head"]["ref"],
base_branch=data["base"]["ref"],
author=author,
)

async def create_pull_request(
Expand Down
140 changes: 140 additions & 0 deletions codeframe/ui/routers/pr_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,39 @@ class PRStatusResponse(BaseModel):
pr_number: int


class GateBreakdownItem(BaseModel):
"""A single gate pass/fail entry in a proof snapshot."""

gate: str
status: str


class ProofSnapshotOut(BaseModel):
"""Proof snapshot at time of PR creation."""

gates_passed: int
gates_total: int
gate_breakdown: list[GateBreakdownItem]


class PRHistoryItem(BaseModel):
"""A single merged PR with optional proof snapshot."""

number: int
title: str
merged_at: str
author: Optional[str]
url: str
proof_snapshot: Optional[ProofSnapshotOut]


class PRHistoryResponse(BaseModel):
"""Response for PR history list."""

pull_requests: list[PRHistoryItem]
total: int


# ============================================================================
# Helper Functions
# ============================================================================
Expand Down Expand Up @@ -285,6 +318,78 @@ async def list_pull_requests(
await client.close()


@router.get("/history", response_model=PRHistoryResponse)
@rate_limit_standard()
async def get_pr_history(
request: Request,
limit: int = Query(10, ge=1, le=50),
workspace: Workspace = Depends(get_v2_workspace),
) -> PRHistoryResponse:
"""List recently merged PRs with proof snapshots.

Returns merged PRs sorted by merged_at descending, each with an
optional proof snapshot showing gate pass/fail at PR creation time.

Args:
limit: Maximum number of PRs to return (1-50, default 10)
workspace: v2 Workspace

Returns:
PRHistoryResponse with merged PRs and proof snapshots
"""
from codeframe.core.proof.ledger import get_pr_proof_snapshot

client = _get_github_client()
try:
prs = await client.list_pull_requests(state="closed")

# Filter to only merged PRs and sort newest first.
merged = [pr for pr in prs if pr.merged_at is not None]
merged.sort(key=lambda pr: pr.merged_at, reverse=True)
merged = merged[:limit]
Comment on lines +344 to +349
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

/history can miss the actual latest merged PRs.

This only examines GitHub's first page of state="closed" PRs and then sorts that subset by merged_at. A long-lived PR merged recently can fall off page 1, so the history view can omit the real latest merged PRs. Paginate until you collect limit merged PRs, or query GitHub by merge time directly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/ui/routers/pr_v2.py` around lines 344 - 349, The current logic only
inspects the first page returned by client.list_pull_requests(state="closed")
and then sorts that subset, which can miss recently merged PRs; change
list_pull_requests usage to paginate (fetch subsequent pages) until you've
collected at least `limit` PRs with pr.merged_at not None or there are no more
pages, accumulate merged PRs across pages into `merged`, then sort merged by
pr.merged_at descending and truncate to `limit`; update the code that builds
`merged` (the list comprehension and subsequent sort) and the call site of
client.list_pull_requests to support pagination.


items: list[PRHistoryItem] = []
for pr in merged:
snapshot = get_pr_proof_snapshot(workspace, pr.number)
proof_snapshot = None
if snapshot:
proof_snapshot = ProofSnapshotOut(
gates_passed=snapshot["gates_passed"],
gates_total=snapshot["gates_total"],
gate_breakdown=[
GateBreakdownItem(**g) for g in snapshot["gate_breakdown"]
],
)
items.append(
PRHistoryItem(
number=pr.number,
title=pr.title,
merged_at=pr.merged_at.isoformat(),
author=pr.author,
url=pr.url,
proof_snapshot=proof_snapshot,
)
)

return PRHistoryResponse(pull_requests=items, total=len(items))

except GitHubAPIError as e:
raise HTTPException(
status_code=e.status_code,
detail=api_error("GitHub API error", ErrorCodes.EXECUTION_FAILED, e.message),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get PR history: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=api_error("Failed to get PR history", ErrorCodes.EXECUTION_FAILED, str(e)),
)
finally:
await client.close()


@router.get("/{pr_number}", response_model=PRResponse)
@rate_limit_standard()
async def get_pull_request(
Expand Down Expand Up @@ -355,6 +460,41 @@ async def create_pull_request(
base=body.base,
)

# Capture proof snapshot at PR creation time.
try:
from codeframe.core.proof.ledger import (
init_proof_tables,
list_requirements,
save_pr_proof_snapshot,
)

init_proof_tables(workspace)
reqs = list_requirements(workspace)

gates_total = 0
gates_passed = 0
gate_breakdown: list[dict] = []
for req in reqs:
for ob in req.obligations:
gates_total += 1
passed = ob.status == "satisfied"
if passed:
gates_passed += 1
gate_breakdown.append({
"gate": ob.gate.value,
"status": ob.status,
})
Comment on lines +474 to +486
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This counts requirement obligations, not unique gates.

gates_total and gate_breakdown are built from every req.obligations entry, so the same PROOF9 gate can appear multiple times across requirements. That inflates the badge and produces repeated rows instead of a gate-by-gate snapshot. Aggregate by ob.gate before persisting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/ui/routers/pr_v2.py` around lines 474 - 486, The current loop over
reqs builds gates_total and gate_breakdown from every req.obligations entry,
causing duplicate gates; instead aggregate obligations by ob.gate: iterate reqs
and req.obligations to build a dict keyed by ob.gate (use ob.gate.value or
ob.gate as key) and compute a single status per gate (e.g., mark "passed" if any
obligation for that gate has status == "passed", otherwise the most severe
status), then set gates_total = number of unique keys and build gate_breakdown
from the aggregated dict; update the code paths that reference gates_total and
gate_breakdown accordingly.


save_pr_proof_snapshot(
workspace,
pr_number=pr.number,
gates_passed=gates_passed,
gates_total=gates_total,
gate_breakdown=gate_breakdown,
)
Comment on lines +463 to +494
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The snapshot is captured at PR creation, not at merge.

This stores proof data immediately after create_pull_request(), but the feature is exposing proof state for merged PR history. Any gates that change while the PR is open will make the history snapshot stale and misleading. Capture or refresh the snapshot in the merge flow instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/ui/routers/pr_v2.py` around lines 463 - 494, The snapshot is being
taken immediately after create_pull_request; instead, remove the proof-capture
block (init_proof_tables, list_requirements, gate counting and
save_pr_proof_snapshot) from the create PR flow and relocate it to the PR merge
flow so snapshots reflect final merged state; implement the same logic (call
init_proof_tables(workspace), compute gates_total/gates_passed/gate_breakdown
from list_requirements(workspace) and call save_pr_proof_snapshot(workspace,
pr_number=pr.number, ...)) in the merge handler (the function that performs PR
merge/merge_pull_request) so the proof snapshot is created/updated at merge time
rather than on create.

except Exception as snap_err:
logger.warning(f"Failed to save proof snapshot for PR #{pr.number}: {snap_err}")

return _pr_to_response(pr)

except GitHubAPIError as e:
Expand Down
110 changes: 110 additions & 0 deletions tests/core/test_proof_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Tests for pr_proof_snapshots ledger functions.

Verifies save_pr_proof_snapshot and get_pr_proof_snapshot work correctly
with the SQLite-backed proof ledger.
"""

import shutil
import tempfile
from pathlib import Path

import pytest

from codeframe.core.workspace import create_or_load_workspace

pytestmark = pytest.mark.v2


@pytest.fixture
def test_workspace():
temp_dir = Path(tempfile.mkdtemp())
workspace_path = temp_dir / "test_ws"
workspace_path.mkdir(parents=True, exist_ok=True)

workspace = create_or_load_workspace(workspace_path)

yield workspace

shutil.rmtree(temp_dir, ignore_errors=True)


class TestPrProofSnapshot:
"""Tests for save/get pr_proof_snapshot functions."""

def test_save_and_get_snapshot(self, test_workspace):
"""Save a snapshot, retrieve it, verify all fields."""
from codeframe.core.proof.ledger import (
init_proof_tables,
save_pr_proof_snapshot,
get_pr_proof_snapshot,
)

init_proof_tables(test_workspace)

gate_breakdown = [
{"gate": "unit_test", "status": "satisfied"},
{"gate": "lint", "status": "failed"},
]
save_pr_proof_snapshot(
test_workspace,
pr_number=42,
gates_passed=1,
gates_total=2,
gate_breakdown=gate_breakdown,
)

result = get_pr_proof_snapshot(test_workspace, 42)

assert result is not None
assert result["pr_number"] == 42
assert result["gates_passed"] == 1
assert result["gates_total"] == 2
assert result["gate_breakdown"] == gate_breakdown
assert "snapshotted_at" in result

def test_get_nonexistent_snapshot_returns_none(self, test_workspace):
"""Getting a snapshot for a non-existent PR returns None."""
from codeframe.core.proof.ledger import (
init_proof_tables,
get_pr_proof_snapshot,
)

init_proof_tables(test_workspace)

result = get_pr_proof_snapshot(test_workspace, 9999)
assert result is None

def test_snapshot_overwrites_on_same_pr_number(self, test_workspace):
"""Saving a snapshot for the same PR overwrites the previous one."""
from codeframe.core.proof.ledger import (
init_proof_tables,
save_pr_proof_snapshot,
get_pr_proof_snapshot,
)

init_proof_tables(test_workspace)

save_pr_proof_snapshot(
test_workspace,
pr_number=10,
gates_passed=3,
gates_total=5,
gate_breakdown=[{"gate": "unit_test", "status": "satisfied"}],
)

save_pr_proof_snapshot(
test_workspace,
pr_number=10,
gates_passed=5,
gates_total=5,
gate_breakdown=[
{"gate": "unit_test", "status": "satisfied"},
{"gate": "lint", "status": "satisfied"},
],
)

result = get_pr_proof_snapshot(test_workspace, 10)
assert result is not None
assert result["gates_passed"] == 5
assert result["gates_total"] == 5
assert len(result["gate_breakdown"]) == 2
Loading
Loading