Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 81 additions & 23 deletions code_review_graph/communities.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,62 @@ def _to_slug(s: str) -> str:
# ---------------------------------------------------------------------------


def _compute_cohesion_batch(
community_member_qns: list[set[str]],
all_edges: list[GraphEdge],
) -> list[float]:
"""Compute cohesion for multiple communities in a single O(edges) pass.

Builds a ``qualified_name -> community_index`` reverse map (each node
appears in at most one community since all callers produce partitions),
then walks every edge exactly once, bucketing it into internal/external
counters per community.

Total work: O(edges + sum(|members|)) instead of
O(edges * communities) for naive per-community cohesion.

Returns a list of cohesion scores aligned with ``community_member_qns``.
"""
qn_to_idx: dict[str, int] = {}
for idx, members in enumerate(community_member_qns):
for qn in members:
qn_to_idx[qn] = idx

n = len(community_member_qns)
internal = [0] * n
external = [0] * n

for e in all_edges:
sc = qn_to_idx.get(e.source_qualified)
tc = qn_to_idx.get(e.target_qualified)
if sc is None and tc is None:
continue
if sc == tc:
# Safe: sc is not None here (sc == tc and not both None).
assert sc is not None
internal[sc] += 1
else:
if sc is not None:
external[sc] += 1
if tc is not None:
external[tc] += 1

results: list[float] = []
for i in range(n):
total = internal[i] + external[i]
results.append(internal[i] / total if total > 0 else 0.0)
return results


def _compute_cohesion(
member_qns: set[str], all_edges: list[GraphEdge]
) -> float:
"""Compute cohesion: internal_edges / (internal_edges + external_edges)."""
internal = 0
external = 0
for e in all_edges:
src_in = e.source_qualified in member_qns
tgt_in = e.target_qualified in member_qns
if src_in or tgt_in:
if src_in and tgt_in:
internal += 1
else:
external += 1
total = internal + external
if total == 0:
return 0.0
return internal / total
"""Compute cohesion: internal_edges / (internal_edges + external_edges).

For multiple communities, prefer :func:`_compute_cohesion_batch`, which
runs in O(edges) total instead of O(edges) per community.
"""
return _compute_cohesion_batch([member_qns], all_edges)[0]


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -220,16 +258,22 @@ def _detect_leiden(
weights="weight",
)

# Build communities from partition
communities: list[dict[str, Any]] = []
# Build communities from partition. Collect member sets first so we
# can batch-compute all cohesions in a single O(edges) pass below.
pending: list[tuple[list[GraphNode], set[str]]] = []
for cluster_ids in partition:
if len(cluster_ids) < min_size:
continue
members = [idx_to_node[i] for i in cluster_ids if i in idx_to_node]
if len(members) < min_size:
continue
member_qns = {m.qualified_name for m in members}
cohesion = _compute_cohesion(member_qns, edges)
pending.append((members, member_qns))

cohesions = _compute_cohesion_batch([p[1] for p in pending], edges)

communities: list[dict[str, Any]] = []
for (members, member_qns), cohesion in zip(pending, cohesions):
lang_counts = Counter(m.language for m in members if m.language)
dominant_lang = lang_counts.most_common(1)[0][0] if lang_counts else ""
name = _generate_community_name(members)
Expand Down Expand Up @@ -308,15 +352,20 @@ def _detect_leiden_sub(
weights="weight",
)

subs: list[dict[str, Any]] = []
for idx, cluster_ids in enumerate(partition):
pending: list[tuple[list[GraphNode], set[str]]] = []
for cluster_ids in partition:
if len(cluster_ids) < min_size:
continue
members = [idx_to_node[i] for i in cluster_ids if i in idx_to_node]
if len(members) < min_size:
continue
member_qns = {m.qualified_name for m in members}
cohesion = _compute_cohesion(member_qns, edges)
pending.append((members, member_qns))

cohesions = _compute_cohesion_batch([p[1] for p in pending], edges)

subs: list[dict[str, Any]] = []
for (members, member_qns), cohesion in zip(pending, cohesions):
lang_counts = Counter(m.language for m in members if m.language)
dominant_lang = lang_counts.most_common(1)[0][0] if lang_counts else ""
name = _generate_community_name(members)
Expand Down Expand Up @@ -348,12 +397,21 @@ def _detect_file_based(
for n in nodes:
by_file[n.file_path].append(n)

communities: list[dict[str, Any]] = []
# Pre-filter to communities meeting min_size and collect their member
# sets so we can batch-compute all cohesions in a single O(edges) pass.
# Without this, per-community cohesion is O(edges * files), which makes
# community detection effectively hang on large repos.
pending: list[tuple[str, list[GraphNode], set[str]]] = []
for file_path, members in by_file.items():
if len(members) < min_size:
continue
member_qns = {m.qualified_name for m in members}
cohesion = _compute_cohesion(member_qns, edges)
pending.append((file_path, members, member_qns))

cohesions = _compute_cohesion_batch([p[2] for p in pending], edges)

communities: list[dict[str, Any]] = []
for (file_path, members, member_qns), cohesion in zip(pending, cohesions):
lang_counts = Counter(m.language for m in members if m.language)
dominant_lang = lang_counts.most_common(1)[0][0] if lang_counts else ""
name = _generate_community_name(members)
Expand Down
181 changes: 181 additions & 0 deletions tests/test_communities.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from code_review_graph.communities import (
IGRAPH_AVAILABLE,
_compute_cohesion,
_compute_cohesion_batch,
_detect_file_based,
_generate_community_name,
detect_communities,
Expand Down Expand Up @@ -249,6 +250,186 @@ def test_cohesion_no_edges(self):
cohesion = _compute_cohesion(member_qns, [])
assert cohesion == pytest.approx(0.0)

def test_compute_cohesion_batch_matches_single(self):
"""Batch cohesion must produce identical results to calling
_compute_cohesion once per community. Regression guard for the
O(files * edges) -> O(edges) refactor.
"""
edges = [
# Internal to comm_a
GraphEdge(
id=1, kind="CALLS", source_qualified="a::f1",
target_qualified="a::f2", file_path="a.py", line=1, extra={},
),
# Cross-community (a <-> b): external to both
GraphEdge(
id=2, kind="CALLS", source_qualified="a::f1",
target_qualified="b::g1", file_path="a.py", line=2, extra={},
),
# Internal to comm_b
GraphEdge(
id=3, kind="CALLS", source_qualified="b::g1",
target_qualified="b::g2", file_path="b.py", line=3, extra={},
),
# Half-in (b -> c): external to b, ignored by a
GraphEdge(
id=4, kind="CALLS", source_qualified="b::g1",
target_qualified="c::h1", file_path="b.py", line=4, extra={},
),
# Neither endpoint in any tracked community — fully ignored
GraphEdge(
id=5, kind="CALLS", source_qualified="c::h1",
target_qualified="d::k1", file_path="c.py", line=5, extra={},
),
]
comm_a = {"a::f1", "a::f2"}
comm_b = {"b::g1", "b::g2"}

batch = _compute_cohesion_batch([comm_a, comm_b], edges)
expected = [
_compute_cohesion(comm_a, edges),
_compute_cohesion(comm_b, edges),
]
assert batch == expected
# Sanity: comm_a has 1 internal + 1 external = 0.5
# comm_b has 1 internal + 2 external = 1/3
assert batch[0] == pytest.approx(0.5)
assert batch[1] == pytest.approx(1 / 3)

def test_compute_cohesion_batch_empty(self):
"""Batch with empty list returns empty list."""
assert _compute_cohesion_batch([], []) == []

def test_compute_cohesion_batch_no_edges(self):
"""Batch with no edges returns 0.0 per community."""
result = _compute_cohesion_batch([{"a"}, {"b", "c"}], [])
assert result == [0.0, 0.0]

def test_detect_file_based_integration(self):
"""End-to-end: _detect_file_based produces correct member sets and
cohesion values on a hand-built fixture with asymmetric cohesions.

Guards the batch-cohesion refactor against zip misalignment, wrong
member_qns passed to the batch helper, and member/cohesion drift.
Cohesions are deliberately distinct (1.0 vs 0.6667) so a swap would
fail the assertions.
"""
def mk_node(nid: int, name: str, fp: str) -> GraphNode:
return GraphNode(
id=nid, kind="Function", name=name,
qualified_name=f"{fp}::{name}",
file_path=fp, line_start=1, line_end=10, language="python",
parent_name=None, params=None, return_type=None, is_test=False,
file_hash="h", extra={},
)

def mk_edge(eid: int, src: str, tgt: str, fp: str) -> GraphEdge:
return GraphEdge(
id=eid, kind="CALLS", source_qualified=src,
target_qualified=tgt, file_path=fp, line=1, extra={},
)

nodes = [
mk_node(1, "login", "auth.py"),
mk_node(2, "logout", "auth.py"),
mk_node(3, "check_token", "auth.py"),
mk_node(4, "connect", "db.py"),
mk_node(5, "query", "db.py"),
mk_node(6, "close", "db.py"),
]
edges = [
# auth.py: 2 internal, 0 external -> cohesion 1.0
mk_edge(1, "auth.py::login", "auth.py::check_token", "auth.py"),
mk_edge(2, "auth.py::logout", "auth.py::check_token", "auth.py"),
# db.py: 2 internal, 1 external -> cohesion 2/3 ≈ 0.6667
mk_edge(3, "db.py::query", "db.py::connect", "db.py"),
mk_edge(4, "db.py::close", "db.py::connect", "db.py"),
mk_edge(5, "db.py::close", "external.py::log", "db.py"),
]

result = _detect_file_based(nodes, edges, min_size=2)

assert len(result) == 2
by_desc = {c["description"]: c for c in result}
auth = by_desc["File-based community: auth.py"]
db = by_desc["File-based community: db.py"]

# Member sets — catches wrong member_qns being passed to batch helper
assert set(auth["members"]) == {
"auth.py::login", "auth.py::logout", "auth.py::check_token",
}
assert set(db["members"]) == {
"db.py::connect", "db.py::query", "db.py::close",
}

# Cohesions are distinct — zip misalignment would swap these
assert auth["cohesion"] == pytest.approx(1.0)
assert db["cohesion"] == pytest.approx(0.6667)

# Metadata passes through correctly
assert auth["size"] == 3
assert db["size"] == 3
assert auth["dominant_language"] == "python"
assert db["dominant_language"] == "python"
assert auth["level"] == 0
assert db["level"] == 0

def test_detected_cohesions_match_direct_computation(self):
"""Every stored community cohesion must equal what _compute_cohesion
produces when called directly on that community's member set and
the full edge list.

Algorithm-agnostic: runs against whichever path detect_communities
takes (Leiden if igraph is available, file-based otherwise). Any
regression in the batch-cohesion refactor that mis-aligns
cohesions to communities would fail loudly here with specific
community names.

The fixture is deliberately broken out of symmetry (one extra
internal edge in auth.py) so a swap between auth/db cohesions
would be visible.
"""
self._seed_two_clusters()
# Break cohesion symmetry: add one extra internal edge in auth.py
# so auth.py cohesion != db.py cohesion. Without this, the seeded
# fixture has both communities at 2/3 and a zip misalignment
# would be silent.
self.store.upsert_edge(EdgeInfo(
kind="CALLS", source="auth.py::login",
target="auth.py::logout", file_path="auth.py", line=12,
))
self.store.commit()

communities = detect_communities(self.store, min_size=2)
assert len(communities) > 0

all_edges = self.store.get_all_edges()
# Collect the distinct cohesion values we see, to guard against
# the degenerate case where the fixture somehow produces all-equal
# cohesions (which would make a swap undetectable).
seen_cohesions: set[float] = set()
for comm in communities:
# Sub-communities (level=1) have cohesion computed against
# a filtered sub-edge set, so skip them. The fixture is tiny
# enough that no sub-communities are produced in practice.
if comm.get("level", 0) != 0:
continue
member_qns = set(comm["members"])
direct = round(_compute_cohesion(member_qns, all_edges), 4)
assert comm["cohesion"] == direct, (
f"Community {comm['name']!r} stored cohesion "
f"{comm['cohesion']} but direct computation gives {direct}"
)
seen_cohesions.add(comm["cohesion"])

# Sanity: the fixture produced communities with distinct cohesions,
# so the equality check above actually guards against swaps.
assert len(seen_cohesions) >= 2, (
"Fixture regression: all detected communities have the same "
"cohesion, which means a zip misalignment bug would not be "
f"caught here. seen={seen_cohesions}"
)

def test_get_communities_sort_by(self):
"""get_communities respects sort_by parameter."""
self._seed_two_clusters()
Expand Down