-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathcodetf.py
More file actions
279 lines (226 loc) · 8.36 KB
/
codetf.py
File metadata and controls
279 lines (226 loc) · 8.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
from __future__ import annotations
from enum import Enum
from typing import Optional
from pydantic import BaseModel, ConfigDict, model_validator
from codemodder.logging import logger
from ..common import Change, CodeTFWriter, Finding, FixQuality
from ..v2.codetf import AIMetadata as AIMetadatav2
from ..v2.codetf import ChangeSet as v2ChangeSet
from ..v2.codetf import CodeTF as CodeTFv2
from ..v2.codetf import Finding as v2Finding
from ..v2.codetf import Result
from ..v2.codetf import Run as Runv2
class Run(BaseModel):
"""Metadata about the analysis run that produced the results"""
vendor: str
tool: str
version: str
# optional free-form metadata about the project being analyzed
# e.g. project name, directory, commit sha, etc.
projectmetadata: dict | None = None
# analysis duration in milliseconds
elapsed: int | None = None
# optional free-form metadata about the inputs used for the analysis
# e.g. command line, environment variables, etc.
inputmetadata: dict | None = None
# optional free-form metadata about the analysis itself
# e.g. timeouts, memory usage, etc.
analysismetadata: dict | None = None
class FixStatusType(str, Enum):
"""Status of a fix"""
model_config = ConfigDict(frozen=True)
fixed = "fixed"
skipped = "skipped"
failed = "failed"
wontfix = "wontfix"
class FixStatus(BaseModel):
"""Metadata describing fix outcome"""
model_config = ConfigDict(frozen=True)
status: FixStatusType
reason: Optional[str] = None
details: Optional[str] = None
class ChangeSet(BaseModel):
model_config = ConfigDict(frozen=True)
path: str
diff: str
changes: list[Change] = []
class Reference(BaseModel):
url: str
description: Optional[str] = None
@model_validator(mode="after")
def validate_description(self):
self.description = self.description or self.url
return self
class Strategy(str, Enum):
ai = "ai"
hybrid = "hybrid"
deterministic = "deterministic"
class AIMetadata(BaseModel):
model_config = ConfigDict(frozen=True)
provider: Optional[str] = None
models: Optional[list[str]] = None
total_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
prompt_tokens: Optional[int] = None
class GenerationMetadata(BaseModel):
model_config = ConfigDict(frozen=True)
strategy: Strategy
ai: Optional[AIMetadata] = None
provisional: bool
class FixMetadata(BaseModel):
model_config = ConfigDict(frozen=True)
# Fix provider ID, corresponds to legacy codemod ID
id: str
# A brief summary of the fix
summary: str
# A detailed description of the fix
description: str
references: list[Reference] = []
generation: GenerationMetadata
class FixResult(BaseModel):
"""Result corresponding to a single finding"""
model_config = ConfigDict(frozen=True)
finding: Finding
fixStatus: FixStatus
changeSets: list[ChangeSet] = []
fixMetadata: Optional[FixMetadata] = None
fixQuality: Optional[FixQuality] = None
# A description of the reasoning process that led to the fix
reasoningSteps: Optional[list[str]] = None
@model_validator(mode="after")
def validate_fixMetadata(self):
if self.fixStatus.status == FixStatusType.fixed:
if not self.changeSets:
raise ValueError("changeSets must be provided for fixed results")
if not self.fixMetadata:
raise ValueError("fixMetadata must be provided for fixed results")
return self
class CodeTF(CodeTFWriter, BaseModel):
run: Run
results: list[FixResult]
def from_v2_run(run: Runv2) -> Run:
project_metadata = {"directory": run.directory} | (
{"projectName": run.projectName} if run.projectName else {}
)
input_metadata = {"commandLine": run.commandLine} | (
{"sarifs": run.sarifs} if run.sarifs else {}
)
return Run(
vendor=run.vendor,
tool=run.tool,
version=run.version,
elapsed=run.elapsed,
projectmetadata=project_metadata,
inputmetadata=input_metadata,
)
def from_v2_aimetadata(ai_metadata: AIMetadatav2) -> AIMetadata:
return AIMetadata(
provider=ai_metadata.provider,
models=[ai_metadata.model] if ai_metadata.model else None,
total_tokens=ai_metadata.tokens,
completion_tokens=ai_metadata.completion_tokens,
prompt_tokens=ai_metadata.prompt_tokens,
)
def from_v2_result_per_finding(
result: Result,
strategy: Strategy | None = None,
ai_metadata: AIMetadata | None = None,
provisional: bool | None = None,
) -> FixResult | None:
"""
This transformation assumes that the v2 result will only contain a single fixedFinding for all changesets.
"""
changeset: v2ChangeSet | None = None
finding: v2Finding | None = None
# Find the changeset with a fixedFinding
for cs in result.changeset:
if cs.fixedFindings:
changeset = cs
finding = cs.fixedFindings[0]
break
else:
# check each individual change
for change in cs.changes:
if change.fixedFindings:
changeset = cs
finding = change.fixedFindings[0]
break
if changeset is None or finding is None:
logger.debug("Either no changesets or fixed finding in the result")
return None
v3changesets = [
ChangeSet(
path=cs.path, diff=cs.diff, changes=[c.to_common() for c in cs.changes]
)
for cs in result.changeset
]
# Generate the GenerationMetadata from the changeset if not passed as a parameter
fix_result_strategy = strategy or (
Strategy.ai if changeset.ai else Strategy.deterministic
)
fix_result_ai_metadata = ai_metadata or (
from_v2_aimetadata(changeset.ai) if changeset.ai else None
)
fix_result_provisional = provisional or changeset.provisional or False
generation_metadata = GenerationMetadata(
strategy=fix_result_strategy,
ai=fix_result_ai_metadata,
provisional=fix_result_provisional,
)
fix_metadata = FixMetadata(
id=result.codemod,
summary=result.summary,
description=result.description,
generation=generation_metadata,
)
return FixResult(
finding=Finding(**finding.model_dump()),
fixStatus=FixStatus(status=FixStatusType.fixed),
changeSets=v3changesets,
fixMetadata=fix_metadata,
)
def from_v2_result(result: Result) -> list[FixResult]:
fix_results: list[FixResult] = []
# generate fixed
for cs in result.changeset:
# No way of identifying hybrid AI codemods by the metadata alone
generation_metadata = GenerationMetadata(
strategy=Strategy.ai if cs.ai else Strategy.deterministic,
ai=from_v2_aimetadata(cs.ai) if cs.ai else None,
provisional=False,
)
for c in cs.changes:
for f in c.fixedFindings or []:
fix_metadata = FixMetadata(
id=result.codemod,
summary=result.summary,
description=result.description,
generation=generation_metadata,
)
# Retrieve diff from changeset since individual diffs per change may not exist
# If the codetf was generated with per-finding, each ChangeSet will have a single change anyway
changeset = ChangeSet(
path=cs.path, diff=cs.diff, changes=[c.to_common()]
)
fix_results.append(
FixResult(
finding=Finding(**f.model_dump()),
fixStatus=FixStatus(status=FixStatusType.fixed),
changeSets=[changeset],
fixMetadata=fix_metadata,
)
)
# generate unfixed
for f in result.unfixedFindings or []:
fix_results.append(
FixResult(
finding=Finding(**f.model_dump()),
fixStatus=FixStatus(status=FixStatusType.failed, reason=f.reason),
)
)
return fix_results
def from_v2(codetf: CodeTFv2) -> CodeTF:
return CodeTF(
run=from_v2_run(codetf.run),
results=[fr for result in codetf.results for fr in from_v2_result(result)],
)