-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconductor.py
More file actions
1903 lines (1607 loc) · 67.5 KB
/
conductor.py
File metadata and controls
1903 lines (1607 loc) · 67.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""Loqu8 Conductor — webhook listener + MCP server + Redis event bridge.
Receives GitHub webhook payloads from `gh webhook forward` on port 9999,
publishes structured events to Redis pub/sub, and exposes MCP tools for
AI agents to dispatch work, check status, and review results.
Usage:
python3 conductor.py # start on :9999
python3 conductor.py --port 9999 # explicit port
python3 conductor.py --dry-run # log without Redis
MCP tools (protocol):
acknowledge — Claim an issue and start working
update_progress — Post a milestone update
report_blocked — Flag issue as blocked + notify dispatcher
complete — Post summary and close issue
my_assignment — Get full context for your assigned issue
get_issue_context — Read any issue across repos
MCP tools (orchestration):
dispatch — Create a GitHub issue to trigger agent work
dashboard — All-in-one overview: health, agents, events
list_repos — Show monitored repos and directories
agent_status — Detailed status of a specific agent
tail_log — Read last N lines of an agent's log file
Redis channels:
conductor:issues — issue opened/labeled/closed events
conductor:comments — issue comment events (created/edited)
conductor:pull_requests — PR opened/closed events
conductor:check_runs — CI check run completed events
conductor:events — all events (firehose)
"""
import argparse
import json
import logging
import os
import subprocess
import sys
import threading
import time
from collections import deque
from datetime import datetime, timedelta, timezone
import redis
import uvicorn
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("conductor")
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
_REPOS_FILE = os.path.join(os.path.dirname(__file__), "repos.json")
_repos_raw_cache: dict[str, dict | str] = {}
_repos_mtime: float = 0
# Default repo config values
_REPO_DEFAULTS = {
"branch_strategy": "feature-branch",
"base_branch": "develop",
"auto_merge": False,
}
def _load_repos() -> dict[str, dict | str]:
"""Load raw repos.json, hot-reloading on change."""
global _repos_raw_cache, _repos_mtime
try:
mtime = os.path.getmtime(_REPOS_FILE)
if mtime != _repos_mtime:
with open(_REPOS_FILE) as f:
_repos_raw_cache = json.load(f)
_repos_mtime = mtime
log.info("Loaded %d repos from %s", len(_repos_raw_cache), _REPOS_FILE)
except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
if not _repos_raw_cache:
log.error("Failed to load %s: %s", _REPOS_FILE, e)
return _repos_raw_cache
def _get_repo_dirs() -> dict[str, str]:
"""Load repo → directory mapping from repos.json, hot-reloading on change.
Supports both formats:
- Legacy: {"repo": "/path/to/dir"}
- New: {"repo": {"directory": "/path/to/dir", "branch_strategy": "...", ...}}
"""
raw = _load_repos()
result = {}
for repo, value in raw.items():
if isinstance(value, str):
result[repo] = value
elif isinstance(value, dict):
result[repo] = value.get("directory", "")
else:
log.warning("Invalid config for %s in repos.json", repo)
return result
def _get_repo_config(repo: str) -> dict:
"""Get full config for a repo, with defaults applied.
Returns dict with keys: directory, branch_strategy, base_branch, auto_merge.
"""
raw = _load_repos()
value = raw.get(repo)
if value is None:
return {}
if isinstance(value, str):
return {"directory": value, **_REPO_DEFAULTS}
config = {**_REPO_DEFAULTS}
config.update(value)
return config
def _guess_repo_directory(repo: str) -> str:
"""Guess the local directory for a repo based on existing repo paths.
Looks at existing repos to find a common base directory for the org,
then appends the repo name. Falls back to ~/Projects/{org}/{name}.
"""
org, name = repo.split("/", 1) if "/" in repo else ("", repo)
# Find existing repos from the same org to infer the base path
for existing_repo, directory in _get_repo_dirs().items():
existing_org = existing_repo.split("/", 1)[0] if "/" in existing_repo else ""
if existing_org == org and directory:
# Use the parent of this repo's directory as the org base
org_base = os.path.dirname(directory)
return os.path.join(org_base, name)
return os.path.expanduser(f"~/Projects/{org}/{name}")
# Eagerly load once at startup, then hot-reload on change
# Eagerly load at startup for import-time references (parse_issue_event etc.)
# All tool functions call _get_repo_dirs() directly for hot-reload.
# Webhook delivery deduplication (GitHub can re-deliver on timeout)
_seen_deliveries: deque = deque(maxlen=200)
CONDUCTOR_LABELS = {"conductor", "agent:opus", "agent:cursor", "agent:marketing"}
# Maximum dispatch depth to prevent infinite recursion (A dispatches to B dispatches to A...)
MAX_DISPATCH_DEPTH = 3
# Keyword → label mapping for auto-labeling in dispatch()
# Keys are matched case-insensitively against issue title and body.
KEYWORD_LABELS = {
# Bug indicators
"fix": "bug",
"bug": "bug",
"broken": "bug",
"crash": "bug",
"error": "bug",
"fail": "bug",
"wrong": "bug",
"regression": "bug",
# Feature indicators
"feat": "enhancement",
"add": "enhancement",
"implement": "enhancement",
"new": "enhancement",
"support": "enhancement",
# Documentation
"doc": "documentation",
"readme": "documentation",
"typo": "documentation",
# Refactoring / chores
"refactor": "refactor",
"clean": "refactor",
"rename": "refactor",
"move": "refactor",
# Testing
"test": "testing",
"spec": "testing",
"coverage": "testing",
}
# Labels that signal complexity → auto-add agent:opus
COMPLEX_KEYWORDS = {"refactor", "migrate", "redesign", "architect", "security", "performance"}
# Task-type keywords → internal routing hints (not posted as GitHub labels)
# These are included in the event's labels list so subscriber can match them
# in TASK_ROUTING, but filtered out before creating GitHub issues.
TASK_TYPE_KEYWORDS = {
"test": "task:test", "spec": "task:test", "coverage": "task:test",
"review": "task:review", "audit": "task:review",
"feat": "task:feature", "feature": "task:feature", "implement": "task:feature",
}
def infer_labels(title: str, body: str) -> list[str]:
"""Infer issue labels from title and body keywords.
Scans title first (higher signal), then body. Returns deduplicated labels.
Title words are matched as whole words; body uses substring matching
for the first 500 chars to avoid false positives in long descriptions.
"""
inferred: dict[str, None] = {} # ordered set
title_lower = title.lower()
body_lower = (body or "")[:500].lower()
# Check title words (higher confidence — match whole words)
title_words = set(title_lower.split())
for keyword, label in KEYWORD_LABELS.items():
if keyword in title_words:
inferred[label] = None
# Check body (substring match, but only first 500 chars)
for keyword, label in KEYWORD_LABELS.items():
if keyword in body_lower:
inferred[label] = None
# Auto-add agent:opus for complex tasks
all_text = title_lower + " " + body_lower
if any(kw in all_text for kw in COMPLEX_KEYWORDS):
inferred["agent:opus"] = None
# Infer task-type routing hints (internal, filtered before GitHub label creation)
for keyword, task_label in TASK_TYPE_KEYWORDS.items():
if keyword in title_words or keyword in body_lower:
inferred[task_label] = None
return list(inferred)
# Recent events buffer (in-memory, last 100)
recent_events: deque = deque(maxlen=100)
# Active agents tracker (populated from Redis events, persisted to Redis hash)
active_agents: dict[str, dict] = {} # key: "repo#number" → agent info
ACTIVE_AGENTS_REDIS_KEY = "conductor:active_agents"
# Redis client (set in main)
redis_client: redis.Redis | None = None
def _persist_active_agents():
"""Write active_agents dict to Redis so it survives restarts."""
if redis_client is None:
return
try:
if active_agents:
# Store each agent as a hash field (key → JSON)
mapping = {k: json.dumps(v, default=str) for k, v in active_agents.items()}
redis_client.delete(ACTIVE_AGENTS_REDIS_KEY)
redis_client.hset(ACTIVE_AGENTS_REDIS_KEY, mapping=mapping)
else:
redis_client.delete(ACTIVE_AGENTS_REDIS_KEY)
redis_client.expire(ACTIVE_AGENTS_REDIS_KEY, 86400) # 24h TTL
except Exception:
pass
def _restore_active_agents():
"""Restore active_agents from Redis on startup."""
if redis_client is None:
return
try:
data = redis_client.hgetall(ACTIVE_AGENTS_REDIS_KEY)
for key, value in data.items():
k = key.decode() if isinstance(key, bytes) else key
v = json.loads(value)
active_agents[k] = v
if active_agents:
log.info("Restored %d active agent(s) from Redis", len(active_agents))
except Exception as e:
log.warning("Failed to restore active agents: %s", e)
# ---------------------------------------------------------------------------
# Redis helpers
# ---------------------------------------------------------------------------
def publish(channel: str, event: dict):
if redis_client is None:
return 0
try:
msg = json.dumps(event, default=str)
count = redis_client.publish(channel, msg)
log.info(" → Published to %s (%d subscribers)", channel, count)
return count
except redis.ConnectionError:
log.error(" → Redis connection failed")
return 0
def parse_issue_event(payload: dict) -> dict:
action = payload.get("action", "unknown")
issue = payload.get("issue", {})
repo = payload.get("repository", {})
repo_full = repo.get("full_name", "unknown")
labels = [l.get("name", "") for l in issue.get("labels", [])]
repo_config = _get_repo_config(repo_full)
return {
"type": "issue",
"action": action,
"repo": repo_full,
"repo_dir": repo_config.get("directory"),
"repo_config": repo_config,
"issue_number": issue.get("number"),
"issue_title": issue.get("title", ""),
"issue_body": issue.get("body", ""),
"issue_url": issue.get("html_url", ""),
"labels": labels,
"has_conductor_label": bool(CONDUCTOR_LABELS & set(labels)),
"author": issue.get("user", {}).get("login", ""),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
def parse_comment_event(payload: dict) -> dict:
action = payload.get("action", "unknown")
comment = payload.get("comment", {})
issue = payload.get("issue", {})
repo = payload.get("repository", {})
repo_full = repo.get("full_name", "unknown")
labels = [l.get("name", "") for l in issue.get("labels", [])]
repo_config = _get_repo_config(repo_full)
return {
"type": "comment",
"action": action,
"repo": repo_full,
"repo_dir": repo_config.get("directory"),
"repo_config": repo_config,
"issue_number": issue.get("number"),
"issue_title": issue.get("title", ""),
"issue_body": issue.get("body", ""),
"issue_url": issue.get("html_url", ""),
"comment_id": comment.get("id"),
"comment_body": comment.get("body", ""),
"comment_url": comment.get("html_url", ""),
"comment_author": comment.get("user", {}).get("login", ""),
"labels": labels,
"has_conductor_label": bool(CONDUCTOR_LABELS & set(labels)),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
def parse_pull_request_event(payload: dict) -> dict:
action = payload.get("action", "unknown")
pr = payload.get("pull_request", {})
repo = payload.get("repository", {})
repo_full = repo.get("full_name", "unknown")
labels = [l.get("name", "") for l in pr.get("labels", [])]
repo_config = _get_repo_config(repo_full)
return {
"type": "pull_request",
"action": action,
"repo": repo_full,
"repo_dir": repo_config.get("directory"),
"repo_config": repo_config,
"pr_number": pr.get("number"),
"pr_title": pr.get("title", ""),
"pr_body": pr.get("body", ""),
"pr_url": pr.get("html_url", ""),
"pr_head": pr.get("head", {}).get("ref", ""),
"pr_base": pr.get("base", {}).get("ref", ""),
"pr_author": pr.get("user", {}).get("login", ""),
"pr_draft": pr.get("draft", False),
"labels": labels,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
def parse_check_run_event(payload: dict) -> dict:
action = payload.get("action", "unknown")
check_run = payload.get("check_run", {})
repo = payload.get("repository", {})
repo_full = repo.get("full_name", "unknown")
# Extract PR numbers from check_run's pull_requests array
prs = check_run.get("pull_requests", [])
pr_numbers = [pr.get("number") for pr in prs if pr.get("number")]
repo_config = _get_repo_config(repo_full)
return {
"type": "check_run",
"action": action,
"repo": repo_full,
"repo_dir": repo_config.get("directory"),
"repo_config": repo_config,
"check_name": check_run.get("name", ""),
"check_status": check_run.get("status", ""),
"check_conclusion": check_run.get("conclusion", ""),
"pr_numbers": pr_numbers,
"head_sha": check_run.get("head_sha", ""),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# ---------------------------------------------------------------------------
# Webhook HTTP routes (Starlette)
# ---------------------------------------------------------------------------
async def webhook_handler(request: Request):
try:
payload = await request.json()
except Exception:
return JSONResponse({"error": "invalid json"}, status_code=400)
event_type = request.headers.get("X-GitHub-Event", "unknown")
delivery_id = request.headers.get("X-GitHub-Delivery", "")
# Silently ack pings — GitHub sends these frequently as keepalives
if event_type == "ping":
return JSONResponse({"ok": True})
# Deduplicate webhook deliveries (GitHub retries on timeout)
if delivery_id and delivery_id in _seen_deliveries:
log.info("Duplicate delivery %s — skipping", delivery_id[:8])
return JSONResponse({"ok": True, "duplicate": True})
if delivery_id:
_seen_deliveries.append(delivery_id)
log.info("Received %s event (delivery: %s)", event_type, delivery_id[:8])
if event_type == "issues":
event = parse_issue_event(payload)
log.info(
" %s#%s [%s] %s (labels: %s)",
event["repo"], event["issue_number"], event["action"],
event["issue_title"],
", ".join(event["labels"]) or "none",
)
if event["has_conductor_label"]:
log.info(" → CONDUCTOR LABEL DETECTED")
publish("conductor:issues", event)
recent_events.appendleft(event)
elif event_type == "issue_comment":
event = parse_comment_event(payload)
log.info(
" %s#%s comment [%s] by %s (labels: %s)",
event["repo"], event["issue_number"], event["action"],
event["comment_author"],
", ".join(event["labels"]) or "none",
)
if event["has_conductor_label"]:
log.info(" → CONDUCTOR LABEL on commented issue")
publish("conductor:comments", event)
recent_events.appendleft(event)
elif event_type == "pull_request":
event = parse_pull_request_event(payload)
log.info(
" %s PR #%s [%s] %s → %s by %s",
event["repo"], event["pr_number"], event["action"],
event["pr_head"], event["pr_base"], event["pr_author"],
)
publish("conductor:pull_requests", event)
recent_events.appendleft(event)
elif event_type == "check_run":
event = parse_check_run_event(payload)
if event["action"] == "completed":
log.info(
" %s check_run [%s] %s — %s (PRs: %s)",
event["repo"], event["action"], event["check_name"],
event["check_conclusion"],
", ".join(str(n) for n in event["pr_numbers"]) or "none",
)
publish("conductor:check_runs", event)
recent_events.appendleft(event)
# Ignore non-completed check_runs (created, in_progress)
else:
log.info(" %s event — firehose only", event_type)
firehose = {
"type": event_type,
"delivery_id": delivery_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
publish("conductor:events", firehose)
recent_events.appendleft(firehose)
return JSONResponse({"ok": True})
async def health_handler(request: Request):
return JSONResponse({
"service": "conductor",
"status": "running",
"redis": redis_client is not None,
"mcp": True,
"repos": list(_get_repo_dirs().keys()),
"recent_events": len(recent_events),
})
# ---------------------------------------------------------------------------
# MCP Server — Instructions Template + Dynamic State
# ---------------------------------------------------------------------------
_INSTRUCTIONS_TEMPLATE = """Loqu8 Conductor — orchestrate work across repos.
{dynamic_section}
## Agent Protocol
When you are assigned an issue by the conductor, follow this protocol:
1. **Acknowledge**: Call `acknowledge()` — posts a comment and marks you active
2. **Read context**: Call `my_assignment()` — returns the full issue body, comments, and protocol state
3. **Work**: Implement the task. Post milestone updates with `update_progress(milestone)`
- Good milestones: approach decided, major step completed, commit pushed
- 2-4 updates for a typical task is right. Don't over-update.
4. **If blocked**: Call `report_blocked(reason)` — adds label, notifies dispatcher, stops work
5. **When done**: Call `complete(summary)` — posts summary, closes issue
## Tool Selection Guide
| Need | Tool |
|------|------|
| Claim your assigned issue | `acknowledge()` |
| Get full issue context | `my_assignment()` |
| Post a milestone update | `update_progress(milestone)` |
| Flag yourself as blocked | `report_blocked(reason)` |
| Close issue with summary | `complete(summary)` |
| Read any issue across repos | `get_issue_context(repo, number)` |
| Dispatch work to another repo | `dispatch(repo, title, body)` |
| See all active agents + health | `dashboard()` |
| Find stale/abandoned issues | `check_stale(hours?)` |
| List monitored repos | `list_repos()` |
| Add a repo to monitoring | `add_repo(repo, directory)` |
| Agent/reviewer performance data | `tracking(days?, repo?)` |
| Detailed status of one agent | `agent_status(repo, issue_number)` |
| Peek at agent's live log output | `tail_log(repo, issue_number, lines?)` |
## Protocol Tool Defaults
If you were spawned by the conductor, `repo` and `issue_number` are set automatically
from environment variables (`CONDUCTOR_AGENT_REPO`, `CONDUCTOR_AGENT_ISSUE`).
MCP-connected agents must pass them explicitly.
## Re-triggering Agents
To re-spawn an agent on an existing issue (e.g., after a stale session or failed run),
comment `/run` on the issue. Optional context after `/run` is passed to the new agent.
## MCP Staleness
If conductor tools return errors or the MCP session is stale (conductor was restarted),
fall back to direct gh CLI commands:
- Comment: `gh issue comment <number> --repo <repo> --body "<message>"`
- Close: `gh issue close <number> --repo <repo>`
- Re-trigger: comment `/run` on the issue
## Monitored Repos
{repos_section}
## Conductor Labels
Issues with these labels trigger agent work: {labels_list}
"""
def _build_dynamic_instructions() -> str:
"""Build the dynamic section with live agent/event/health state."""
lines = []
# Active agents
active = [info for info in active_agents.values()
if info.get("status") not in ("success", "failure")]
if active:
lines.append(f"## Live State — {len(active)} Active Agent(s)\n")
for info in active:
started = info.get("started", "")
duration = ""
if started:
try:
start_dt = datetime.fromisoformat(started)
elapsed = datetime.now(timezone.utc) - start_dt
minutes = int(elapsed.total_seconds() // 60)
duration = f" ({minutes}m)" if minutes > 0 else " (<1m)"
except (ValueError, TypeError):
pass
lines.append(f"- **{info.get('repo', '?')}#{info.get('issue_number', '?')}** "
f"— {info.get('title', '?')}{duration}")
lines.append("")
else:
lines.append("## Live State — No Active Agents\n")
# Recent events (last 3)
issue_events = [e for e in recent_events if e.get("type") == "issue"][:3]
if issue_events:
lines.append("### Recent Events")
for ev in issue_events:
lines.append(f"- {ev.get('repo', '?')}#{ev.get('issue_number', '?')} "
f"[{ev.get('action', '?')}] {ev.get('issue_title', '')}")
lines.append("")
# System health
redis_ok = redis_client is not None
try:
if redis_ok:
redis_client.ping()
except Exception:
redis_ok = False
lines.append(f"### Health\n- Redis: {'connected' if redis_ok else 'disconnected'}")
lines.append(f"- Events buffered: {len(recent_events)}")
lines.append("")
return "\n".join(lines)
def _build_repos_section() -> str:
"""Build the repos list for instructions."""
return "\n".join(f"- `{repo}` → `{directory}`"
for repo, directory in _get_repo_dirs().items())
def _format_instructions() -> str:
"""Format the full instructions string with current state."""
return _INSTRUCTIONS_TEMPLATE.format(
dynamic_section=_build_dynamic_instructions(),
repos_section=_build_repos_section(),
labels_list=", ".join(f"`{l}`" for l in sorted(CONDUCTOR_LABELS)),
)
def _refresh_instructions() -> None:
"""Update the MCP server instructions with current state."""
try:
mcp._mcp_server.instructions = _format_instructions()
except Exception:
pass # Keep existing instructions on error
mcp = FastMCP(
"conductor",
instructions=_INSTRUCTIONS_TEMPLATE.format(
dynamic_section="## Live State — Starting up...\n\n",
repos_section=_build_repos_section(),
labels_list=", ".join(f"`{l}`" for l in sorted(CONDUCTOR_LABELS)),
),
)
# ---------------------------------------------------------------------------
# Helper: resolve repo/issue from args or env
# ---------------------------------------------------------------------------
def _resolve_repo_issue(
repo: str | None, issue_number: int | None
) -> tuple[str, int] | dict:
"""Resolve repo and issue_number from explicit args or env vars.
Returns (repo, issue_number) tuple or an error dict.
"""
repo = repo or os.environ.get("CONDUCTOR_AGENT_REPO")
issue_str = str(issue_number) if issue_number else os.environ.get("CONDUCTOR_AGENT_ISSUE")
if not repo or not issue_str:
return {"error": "repo and issue_number are required (not set in args or env vars "
"CONDUCTOR_AGENT_REPO / CONDUCTOR_AGENT_ISSUE)"}
try:
return (repo, int(issue_str))
except (ValueError, TypeError):
return {"error": f"Invalid issue_number: {issue_str}"}
def _ensure_labels(repo: str, labels: list[str]) -> None:
"""Create any missing labels in the target repo. Best effort."""
for label in labels:
try:
subprocess.run(
["gh", "label", "create", label, "--repo", repo,
"--description", "Conductor-managed", "--color", "5319E7"],
capture_output=True, text=True, timeout=15,
)
except Exception:
pass # Label may already exist — gh returns non-zero, that's fine
def _gh_comment(repo: str, number: int, body: str) -> dict | None:
"""Post a comment on a GitHub issue. Returns error dict or None on success."""
try:
result = subprocess.run(
["gh", "issue", "comment", str(number), "--repo", repo, "--body", body],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return {"error": f"gh comment failed: {result.stderr.strip()}"}
return None
except subprocess.TimeoutExpired:
return {"error": "gh issue comment timed out"}
except FileNotFoundError:
return {"error": "gh CLI not found"}
# ---------------------------------------------------------------------------
# MCP Tools — Protocol (for spawned agents)
# ---------------------------------------------------------------------------
@mcp.tool()
def acknowledge(
repo: str | None = None,
issue_number: int | None = None,
) -> dict:
"""Acknowledge your assigned issue — call this first when you start working.
Posts a comment on the issue and marks you as the active agent.
Repo and issue_number default from env vars if you were spawned by conductor.
Args:
repo: Target repo (e.g., 'loqu8/ice'). Defaults from CONDUCTOR_AGENT_REPO env.
issue_number: Issue number. Defaults from CONDUCTOR_AGENT_ISSUE env.
"""
resolved = _resolve_repo_issue(repo, issue_number)
if isinstance(resolved, dict):
return resolved
repo, number = resolved
repo_dir = _get_repo_dirs().get(repo, "unknown")
comment = f"🤖 Agent picking this up. Working in `{repo_dir}`."
err = _gh_comment(repo, number, comment)
if err:
return err
# Track as active
key = f"{repo}#{number}"
active_agents[key] = {
"repo": repo,
"issue_number": number,
"started": datetime.now(timezone.utc).isoformat(),
"status": "running",
"milestones": 0,
}
log.info("Agent acknowledged: %s", key)
_refresh_instructions()
return {"acknowledged": True, "repo": repo, "issue_number": number}
@mcp.tool()
def update_progress(
milestone: str,
repo: str | None = None,
issue_number: int | None = None,
) -> dict:
"""Post a milestone update on your assigned issue.
Good milestones: approach decided, major step completed, commit pushed.
2-4 updates per task is typical — don't over-update.
Args:
milestone: Brief description of what was accomplished
repo: Target repo. Defaults from CONDUCTOR_AGENT_REPO env.
issue_number: Issue number. Defaults from CONDUCTOR_AGENT_ISSUE env.
"""
resolved = _resolve_repo_issue(repo, issue_number)
if isinstance(resolved, dict):
return resolved
repo, number = resolved
comment = f"📍 {milestone}"
err = _gh_comment(repo, number, comment)
if err:
return err
# Update milestone count
key = f"{repo}#{number}"
if key in active_agents:
active_agents[key]["milestones"] = active_agents[key].get("milestones", 0) + 1
log.info("Progress update on %s: %s", key, milestone[:80])
_refresh_instructions()
return {"updated": True, "repo": repo, "issue_number": number, "milestone": milestone}
@mcp.tool()
def report_blocked(
reason: str,
repo: str | None = None,
issue_number: int | None = None,
) -> dict:
"""Report that you are blocked on your assigned issue.
Posts the reason as a comment, adds the 'blocked' label, and triggers
a callback notification to the agent that dispatched this work (if any).
After calling this, stop working — a human or dispatcher will help.
Args:
reason: What you tried and what's blocking you
repo: Target repo. Defaults from CONDUCTOR_AGENT_REPO env.
issue_number: Issue number. Defaults from CONDUCTOR_AGENT_ISSUE env.
"""
resolved = _resolve_repo_issue(repo, issue_number)
if isinstance(resolved, dict):
return resolved
repo, number = resolved
comment = f"🚧 **Blocked**: {reason}"
err = _gh_comment(repo, number, comment)
if err:
return err
# Add blocked label
try:
subprocess.run(
["gh", "issue", "edit", str(number), "--repo", repo, "--add-label", "blocked"],
capture_output=True, text=True, timeout=15,
)
except Exception:
pass # Best effort
# Update agent status
key = f"{repo}#{number}"
if key in active_agents:
active_agents[key]["status"] = "blocked"
log.info("Agent blocked on %s: %s", key, reason[:80])
_refresh_instructions()
return {"blocked": True, "repo": repo, "issue_number": number, "reason": reason}
@mcp.tool()
def complete(
summary: str,
repo: str | None = None,
issue_number: int | None = None,
) -> dict:
"""Complete your assigned issue — post summary and close it.
Posts a completion comment with your summary, then closes the issue.
Call this when all work is done and verified.
Args:
summary: What you changed, which files, test results
repo: Target repo. Defaults from CONDUCTOR_AGENT_REPO env.
issue_number: Issue number. Defaults from CONDUCTOR_AGENT_ISSUE env.
"""
resolved = _resolve_repo_issue(repo, issue_number)
if isinstance(resolved, dict):
return resolved
repo, number = resolved
comment = f"✅ **Done**\n\n{summary}"
err = _gh_comment(repo, number, comment)
if err:
return err
# Close the issue
try:
result = subprocess.run(
["gh", "issue", "close", str(number), "--repo", repo],
capture_output=True, text=True, timeout=15,
)
if result.returncode != 0:
return {"error": f"Failed to close issue: {result.stderr.strip()}"}
except subprocess.TimeoutExpired:
return {"error": "gh issue close timed out"}
except FileNotFoundError:
return {"error": "gh CLI not found"}
# Update agent status
key = f"{repo}#{number}"
if key in active_agents:
active_agents[key]["status"] = "success"
active_agents[key]["completed"] = datetime.now(timezone.utc).isoformat()
log.info("Agent completed %s: %s", key, summary[:80])
_refresh_instructions()
return {"completed": True, "repo": repo, "issue_number": number, "summary": summary}
@mcp.tool()
def my_assignment(
repo: str | None = None,
issue_number: int | None = None,
) -> dict:
"""Get full context for your assigned issue.
Returns the issue body, comments, labels, and your protocol state
(milestones posted, time active). Call this after acknowledge() to
understand what you need to do.
Args:
repo: Target repo. Defaults from CONDUCTOR_AGENT_REPO env.
issue_number: Issue number. Defaults from CONDUCTOR_AGENT_ISSUE env.
"""
resolved = _resolve_repo_issue(repo, issue_number)
if isinstance(resolved, dict):
return resolved
repo, number = resolved
# Fetch issue details
try:
result = subprocess.run(
["gh", "issue", "view", str(number), "--repo", repo,
"--json", "title,body,labels,state,comments,assignees"],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return {"error": f"Failed to fetch issue: {result.stderr.strip()}"}
issue_data = json.loads(result.stdout)
except subprocess.TimeoutExpired:
return {"error": "gh issue view timed out"}
except (FileNotFoundError, json.JSONDecodeError) as e:
return {"error": str(e)}
# Add protocol state
key = f"{repo}#{number}"
agent_state = active_agents.get(key, {})
return {
"repo": repo,
"issue_number": number,
"title": issue_data.get("title", ""),
"body": issue_data.get("body", ""),
"labels": [l.get("name", "") for l in issue_data.get("labels", [])],
"state": issue_data.get("state", ""),
"comments": [
{"author": c.get("author", {}).get("login", ""),
"body": c.get("body", ""),
"created": c.get("createdAt", "")}
for c in issue_data.get("comments", [])
],
"protocol_state": {
"status": agent_state.get("status", "not_tracked"),
"milestones": agent_state.get("milestones", 0),
"started": agent_state.get("started"),
},
}
@mcp.tool()
def get_issue_context(
repo: str,
number: int,
) -> dict:
"""Read any issue across monitored repos — not just your assignment.
Use this to check related issues, understand dependencies, or review
work done by other agents.
Args:
repo: Target repo (e.g., 'loqu8/ice')
number: Issue number
"""
try:
result = subprocess.run(
["gh", "issue", "view", str(number), "--repo", repo,
"--json", "title,body,labels,state,comments"],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return {"error": f"Failed to fetch issue: {result.stderr.strip()}"}
issue_data = json.loads(result.stdout)
except subprocess.TimeoutExpired:
return {"error": "gh issue view timed out"}
except (FileNotFoundError, json.JSONDecodeError) as e:
return {"error": str(e)}
return {
"repo": repo,
"number": number,
"title": issue_data.get("title", ""),
"body": issue_data.get("body", ""),
"labels": [l.get("name", "") for l in issue_data.get("labels", [])],
"state": issue_data.get("state", ""),
"comments": [
{"author": c.get("author", {}).get("login", ""),
"body": c.get("body", ""),
"created": c.get("createdAt", "")}
for c in issue_data.get("comments", [])
],
}
# ---------------------------------------------------------------------------
# MCP Tools — Orchestration
# ---------------------------------------------------------------------------
@mcp.tool()
def dispatch(
repo: str,
title: str,
body: str,
labels: list[str] | None = None,
callback_session: str | None = None,
dispatch_depth: int = 0,
) -> dict:
"""Create a GitHub issue in a target repo to dispatch agent work.
The issue is labeled with 'conductor' automatically. After creation,