-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcollect_data_url.py
More file actions
162 lines (144 loc) · 5.54 KB
/
collect_data_url.py
File metadata and controls
162 lines (144 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import github3
import requests
import logging
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 配置 GitHub API 访问
def get_github_token():
token_file = Path.home() / '.github_token'
if token_file.exists():
return token_file.read_text().strip()
return None
def parse_github_url(url):
"""从GitHub URL中提取owner和repo名称,包含缓存和错误处理"""
try:
# 解析URL并验证格式
parsed_url = urlparse(url)
if not parsed_url.netloc or parsed_url.netloc != 'github.com':
raise ValueError('Invalid GitHub URL')
# 处理路径
path = parsed_url.path.strip('/')
parts = path.split('/')
# 验证路径格式
if len(parts) < 2:
raise ValueError('Invalid GitHub repository path')
# 只返回owner和repo名称
return parts[0], parts[1]
except Exception as e:
logging.error(f'Error parsing GitHub URL {url}: {str(e)}')
raise
github_token = get_github_token()
if not github_token:
logging.error('GitHub token not found. Please create ~/.github_token file with your token')
exit(1)
try:
github = github3.login(token=github_token)
if not github:
raise Exception('Failed to authenticate with GitHub')
logging.info('Successfully authenticated with GitHub')
except Exception as e:
logging.error(f'Error authenticating with GitHub: {str(e)}')
exit(1)
# 项目列表 - 使用完整的GitHub URL
projects = [
{'url': 'https://github.com/THUDM/CodeGeeX', 'size': 'L'}
]
# 存储收集到的数据
data = []
# 定义变更类型
CHANGE_TYPES = {
'api': ['add api', 'modify api', 'update api'],
'refactoring': ['refactor', 'refactoring'],
'bugfix': ['fix', 'bugfix', 'bug fix']
}
def get_change_type(message):
message_lower = message.lower()
for change_type, keywords in CHANGE_TYPES.items():
for keyword in keywords:
if keyword in message_lower:
return change_type
return None
def get_commit_details(repo, commit_sha):
commit = repo.commit(commit_sha)
return {
'commit_sha': commit.sha,
'commit_message': commit.commit.message,
'committer_date': commit.commit.committer.date,
'affected_files': [file.filename for file in commit.files]
}
def get_test_files(repo, commit_sha):
commit = repo.commit(commit_sha)
test_files = []
for file in commit.files:
if 'test' in file.filename or 'spec' in file.filename:
test_files.append(file.filename)
return test_files
import itertools
def collect_data(repo, project_size):
try:
logging.info(f'Collecting data for repository: {repo.full_name}')
# 限制获取最近的5个提交用于测试
repo_commits = repo.commits()
if not repo_commits:
logging.warning(f'No commits found in repository: {repo.full_name}')
return
commits = list(itertools.islice(repo_commits, 5))
for commit in commits:
change_type = get_change_type(commit.commit.message)
if change_type:
commit_details = get_commit_details(repo, commit.sha)
test_files = get_test_files(repo, commit.sha)
data.append({
'project': {
'repo_url': repo.html_url,
'size_category': project_size,
'language': 'Python'
},
'change': {
'type': change_type,
'description': commit.commit.message,
'task_link': None,
'branch': commit.committer['branch'] if 'branch' in commit.committer else 'master',
'affected_files': commit_details['affected_files'],
'commit': {
'before': None, # 需要更多逻辑来获取变更前的 commit
'after': commit_details['commit_sha']
},
'tests': {
'file_paths': test_files,
'modules': [file.split('/')[-1].split('.')[0] for file in test_files]
},
'timestamp': commit_details['committer_date'].isoformat()
}
})
except Exception as e:
logging.error(f'Error collecting data from repository {repo.full_name}: {str(e)}')
for project in projects:
try:
# 从URL中提取owner和repo名称
owner, repo_name = parse_github_url(project['url'])
logging.info(f'Processing project: {owner}/{repo_name}')
repo = github.repository(owner, repo_name)
if not repo:
logging.error(f'Repository not found: {project["url"]}')
continue
collect_data(repo, project['size'])
except Exception as e:
logging.error(f'Error processing project {project["url"]}: {str(e)}')
# 保存数据到文件
import json
try:
output_file = 'collected_data_url.json'
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logging.info(f'Successfully saved data to {output_file}')
logging.info(f'Total records collected: {len(data)}')
except Exception as e:
logging.error(f'Error saving data to file: {str(e)}')
exit(1)