forked from onlyphantom/llm-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproject_assignment_05.py
More file actions
190 lines (148 loc) · 6.9 KB
/
project_assignment_05.py
File metadata and controls
190 lines (148 loc) · 6.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""
This code shows analyze three companies, identified by its stock ticker. The agent is run three times in parallel, and deliver the result
comparisation from those three companies .
# Usage:
🤖: I'm a comparative financial research analyst. Enter three stock ticker from IDX's companies to compare.
😊: ADRO BBCA BREN
"""
import asyncio
import os
from dotenv import load_dotenv
from agents import Agent, Runner, ItemHelpers, function_tool, trace
from utils.api_client import retrieve_from_endpoint
from typing import List
from datetime import datetime
import os.path
load_dotenv()
@function_tool
def get_company_financials(ticker: str) -> str:
"""
Get company financials from Indonesia Exchange (IDX)
"""
url = f"https://api.sectors.app/v1/company/report/{ticker}/?sections=financials"
try:
return retrieve_from_endpoint(url)
except Exception as e:
print(f"Error occurred: {e}")
return None
@function_tool
def get_revenue_segments(ticker: str) -> str:
"""
Get revenue segments for a company from Indonesia Exchange (IDX)
"""
url = f"https://api.sectors.app/v1/company/get-segments/{ticker}/"
try:
return retrieve_from_endpoint(url)
except Exception as e:
print(f"Error occurred: {e}")
return None
@function_tool
def get_quarterly_financials(ticker: str) -> str:
"""
Get revenue segments for a company from Indonesia Exchange (IDX)
"""
url = f"https://api.sectors.app/v1/financials/quarterly/{ticker}/?report_date=2024-12-31&approx=true"
try:
return retrieve_from_endpoint(url)
except Exception as e:
print(f"Error occurred: {e}")
return None
financial_research_agent = Agent(
name="financial_research_agent",
instructions="""You are a financial research analyst. Research the given company ticker and provide comprehensive financial analysis including:
1. Company financials and performance metrics
2. Revenue breakdown and business segments
3. Quarterly financial trends
Provide a detailed analysis that can be compared with other companies.""",
tools=[get_company_financials, get_revenue_segments, get_quarterly_financials],
output_type=str
)
research_team_leader_aggregator = Agent(
name="research_team_leader_aggregator",
instructions="You are the team leader of a research team. You will aggregate the results from these agents and provide a consolidated answer that is relevant to the user.",
output_type=str
)
async def save_comparison_to_file(content: str, tickers: List[str]):
"""Save comparison report to timestamped file in comparison_output folder"""
try:
# Create output directory if it doesn't exist
output_dir = "comparison_output"
os.makedirs(output_dir, exist_ok=True)
# Generate timestamp filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
ticker_str = '_vs_'.join(tickers)
filename = f"comparison_{ticker_str}_{timestamp}.txt"
filepath = os.path.join(output_dir, filename)
# Save content to file
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
print(f"\n📁 Report saved to: {filepath}")
except Exception as e:
print(f"❌ Error saving report: {e}")
async def main():
input_prompt = input(f"🤖: I'm a comparative financial research analyst. Enter 2-4 stock tickers from IDX's companies to compare (comma or space separated): \n😊: ")
# Parse and validate input
try:
# Handle both comma-separated and space-separated inputs
if ',' in input_prompt:
tickers = [ticker.strip().upper() for ticker in input_prompt.split(',')]
else:
tickers = [ticker.strip().upper() for ticker in input_prompt.split()]
tickers = [ticker for ticker in tickers if ticker] # Remove empty strings
if len(tickers) < 2 or len(tickers) > 4:
print(f"❌ Error: Please enter between 2 and 4 tickers. You provided {len(tickers)}.")
return
# Validate ticker format (basic validation)
for ticker in tickers:
if not ticker.isalpha() or len(ticker) < 3 or len(ticker) > 6:
print(f"❌ Error: '{ticker}' doesn't look like a valid stock ticker. Tickers should be 3-6 letters.")
return
except Exception as e:
print(f"❌ Error parsing input: {e}")
return
print(f"🔍 Analyzing companies: {', '.join(tickers)}...")
# Ensure the entire workflow is a single trace
with trace("Parallelization"):
try:
# Run the same agent for all tickers in parallel
results = await asyncio.gather(
*[Runner.run(financial_research_agent, ticker) for ticker in tickers],
return_exceptions=True
)
# Check for exceptions
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"❌ Error analyzing {tickers[i]}: {result}")
results[i] = f"Analysis failed for {tickers[i]}: {str(result)}"
# Extract outputs
outputs = []
for i, result in enumerate(results):
if not isinstance(result, Exception):
output_text = "\n".join(ItemHelpers.text_message_outputs(result.new_items))
outputs.append(f"=== ANALYSIS FOR {tickers[i]} ===\n{output_text}")
# Aggregate the results
aggregated_result = "\n\n".join(outputs)
# Create comparison analysis
comparison_prompt = f"""You are a senior financial analyst. Compare these {len(tickers)} companies based on the research provided:
{aggregated_result}
Provide a comprehensive comparison covering:
1. Financial performance comparison
2. Revenue structure differences
3. Growth trends and prospects
4. Investment considerations and risks
5. Relative strengths and weaknesses
6. Ranking and recommendation summary
Be specific and provide actionable insights. Include a clear ranking from most recommended to least recommended investment."""
comparison_result = await Runner.run(
research_team_leader_aggregator,
comparison_prompt
)
# Format the final output
final_output = f"COMPARATIVE ANALYSIS - {', '.join(tickers)}\n{'='*60}\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{comparison_result.final_output}"
print(f"\n🤖: {final_output}")
# Save to file
await save_comparison_to_file(final_output, tickers)
except Exception as e:
print(f"❌ Error during analysis: {e}")
if __name__ == "__main__":
asyncio.run(main())