-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdeep_research.py
More file actions
187 lines (156 loc) · 6.73 KB
/
deep_research.py
File metadata and controls
187 lines (156 loc) · 6.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Deep research: parallel cache-safe forks investigate a topic, then synthesize.
Usage:
python examples/deep_research.py "Why did Rome fall?"
python examples/deep_research.py # uses default topic
"""
from __future__ import annotations
import asyncio
import os
import re
import sys
import time
from dotenv import load_dotenv
load_dotenv()
from agentcache import AgentSession, ForkPolicy, ForkResult, LiteLLMSDKProvider, Usage
SYSTEM_PROMPT = (
"You are a senior research analyst. You break complex questions into "
"distinct investigation angles, then produce structured reports with "
"evidence-backed findings.\n\n"
"Research methodology:\n"
"1. Identify 3-4 orthogonal angles that together cover the question\n"
"2. For each angle, gather key facts, competing theories, and evidence\n"
"3. Synthesize findings into a coherent narrative with confidence levels\n"
"4. Flag unresolved questions and areas needing further investigation\n\n"
"Output style: clear, dense, no filler. Use bullet points for findings. "
"Cite specific names, dates, and mechanisms rather than vague summaries.\n\n"
+ "".join(
f"Reference principle #{i}: In multi-agent research, each worker should "
f"investigate one focused angle and return a structured report. Leaders "
f"consume reports, never raw worker transcripts. Cache-safe forks share "
f"the parent prefix so parallel workers benefit from prompt caching. "
f"Workers should be ephemeral: skip cache writes for one-shot helpers. "
f"(principle {i})\n"
for i in range(30)
)
)
DEFAULT_TOPIC = "What are the most promising approaches to extending human healthspan beyond 120 years?"
async def main() -> None:
topic = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_TOPIC
if os.getenv("GEMINI_API_KEY"):
model = "gemini/gemini-2.5-flash"
elif os.getenv("OPENAI_API_KEY"):
model = "gpt-4o-mini"
else:
print("Set GEMINI_API_KEY or OPENAI_API_KEY in .env")
return
provider = LiteLLMSDKProvider()
session = AgentSession(
model=model,
provider=provider,
system_prompt=SYSTEM_PROMPT,
)
print(f"Model: {model}")
print(f"Topic: {topic}")
print(f"System prompt: ~{len(SYSTEM_PROMPT):,} chars")
print("=" * 60)
# --- Step 1: Plan the research ---
print("\n[1/4] Planning research angles...")
t0 = time.time()
plan_response = await session.respond(
f"Break this research question into 3-4 distinct investigation angles. "
f"For each angle, write one line: the angle name and a one-sentence scope.\n\n"
f"Question: {topic}\n\n"
f"Format each angle as: '1. Name: scope'"
)
plan_text = plan_response.text
print(f" ({time.time() - t0:.1f}s)")
print(f"\n{plan_text}\n")
angles = _parse_angles(plan_text)
if not angles:
print("Could not parse angles from plan. Raw output above.")
return
print(f"Parsed {len(angles)} angles.")
# --- Step 2: Parallel worker forks ---
print(f"\n[2/4] Launching {len(angles)} parallel workers...")
t1 = time.time()
results: list[ForkResult] = [None] * len(angles) # type: ignore[list-item]
async def run_worker(idx: int, angle: str) -> None:
prompt = (
f"Investigate this specific angle of the research question.\n\n"
f"Original question: {topic}\n"
f"Your angle: {angle}\n\n"
f"Return a structured report with:\n"
f"- 3-5 key findings (bullet points, specific facts)\n"
f"- Competing theories or open debates\n"
f"- Confidence level (high/medium/low) with brief justification\n\n"
f"Be specific. Cite names, dates, mechanisms."
)
results[idx] = await session.fork(
prompt=prompt,
policy=ForkPolicy.cache_safe_ephemeral(),
)
tasks = [run_worker(i, angle) for i, angle in enumerate(angles)]
await asyncio.gather(*tasks)
elapsed_workers = time.time() - t1
total_worker_usage = Usage()
for i, (angle, result) in enumerate(zip(angles, results)):
print(f"\n Worker {i+1}: {angle[:60]}...")
print(f" Tokens: {result.usage.total_tokens:,} | Cache read: {result.usage.cache_read_input_tokens:,}")
total_worker_usage = total_worker_usage + result.usage
print(f"\n All workers done in {elapsed_workers:.1f}s")
print(f" Total worker tokens: {total_worker_usage.total_tokens:,}")
# --- Step 3: Synthesis fork ---
print("\n[3/4] Synthesizing findings...")
t2 = time.time()
worker_reports = "\n\n---\n\n".join(
f"## Angle {i+1}: {angle}\n\n{result.final_text}"
for i, (angle, result) in enumerate(zip(angles, results))
)
synthesis = await session.fork(
prompt=(
f"You have received reports from {len(angles)} research workers "
f"investigating different angles of this question:\n\n"
f"Question: {topic}\n\n"
f"Worker reports:\n\n{worker_reports}\n\n"
f"Synthesize these into a final research brief:\n"
f"1. Executive summary (3-4 sentences)\n"
f"2. Key findings across all angles\n"
f"3. Emerging consensus and open debates\n"
f"4. Recommended next steps for deeper investigation"
),
policy=ForkPolicy.cache_safe_ephemeral(),
)
print(f" ({time.time() - t2:.1f}s)")
# --- Step 4: Final report ---
print("\n" + "=" * 60)
print("RESEARCH REPORT")
print("=" * 60)
print(f"\nTopic: {topic}\n")
print(synthesis.final_text)
# --- Stats ---
total_usage = plan_response.usage + total_worker_usage + synthesis.usage
status = session.cache_status()
print("\n" + "=" * 60)
print("STATS")
print("=" * 60)
print(f" Workers: {len(angles)}")
print(f" Total input tokens: {total_usage.input_tokens:,}")
print(f" Total output tokens:{total_usage.output_tokens:,}")
print(f" Cache read tokens: {total_usage.cache_read_input_tokens:,}")
print(f" Total tokens: {total_usage.total_tokens:,}")
print(f" Cache hit rate: {status.hit_rate_recent:.1%}")
print(f" Wall time: {time.time() - t0:.1f}s")
explanation = session.explain_last_cache_break()
if explanation:
print(f"\n Cache break: {explanation.pretty()}")
def _parse_angles(text: str) -> list[str]:
"""Extract numbered items like '1. Name: scope' from LLM output."""
lines = text.strip().splitlines()
angles = []
for line in lines:
match = re.match(r"^\s*\d+[\.\)]\s*(.+)", line.strip())
if match:
angles.append(match.group(1).strip())
return angles
if __name__ == "__main__":
asyncio.run(main())