Conversation
| raise SynthesisExportError("No file instances to export") | ||
|
|
||
| output_dir = self._output_path or self._get_default_output_dir() | ||
| os.makedirs(output_dir, exist_ok=True) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 10 days ago
In general terms, we need to ensure that any user-controlled path is validated and constrained before being used with filesystem APIs. In this case, we should ensure that output_path (if provided) resolves to a subdirectory of a server-controlled base directory, after normalizing the path. If it does not, we should reject it with a clear error. We should also avoid letting the user force absolute paths.
The best fix here is to add a helper method in SynthesisDatasetExporter that computes a safe output directory: it should decide on a base export root (for example, a fixed directory under the application’s working directory), normalize the combination of that root and any user-supplied output_path, and then verify that the resulting path stays under the root. Then export_data should call this helper instead of using self._output_path directly. This maintains the existing behavior (allowing per-request subdirectories) but prevents directory traversal and arbitrary absolute paths.
Concretely, in runtime/datamate-python/app/module/generation/service/export_service.py:
-
Add a private method, e.g.
_get_safe_output_dir, toSynthesisDatasetExporter:- If
self._output_pathis falsy: return_get_default_output_dir()(existing behavior). - Otherwise:
- Choose a base root, for example
base_root = os.path.abspath(self._get_default_output_dir()). This keeps everything under whatever default export root the app already uses. - Normalize the user-specified relative path:
safe_rel = os.path.normpath(self._output_path). - Reject absolute paths or paths that start with
os.pardir(..) after normalization, by raisingSynthesisExportError. - Compose the final path:
candidate = os.path.abspath(os.path.join(base_root, safe_rel)). - Verify that
candidatestarts withbase_root + os.sepor equalsbase_root. If not, raiseSynthesisExportError. - Return
candidate.
- Choose a base root, for example
- If
-
Update
export_dataso that line 151 usesoutput_dir = self._get_safe_output_dir()instead ofself._output_path or self._get_default_output_dir().
No new imports are needed beyond os, which is already imported. All changes are confined to SynthesisDatasetExporter in export_service.py.
| @@ -46,6 +46,37 @@ | ||
| self._format = format if format in self.SUPPORTED_FORMATS else self.DEFAULT_FORMAT | ||
| self._output_path = output_path | ||
|
|
||
| def _get_safe_output_dir(self) -> str: | ||
| """ | ||
| 获取安全的导出目录。 | ||
|
|
||
| 如果未显式指定 output_path,则退回到默认导出目录。 | ||
| 如果指定了 output_path,则将其视为默认导出目录下的相对路径, | ||
| 并进行规范化和越权访问检查,防止目录遍历或写入任意位置。 | ||
| """ | ||
| # 默认根目录(应用已有的默认导出目录) | ||
| base_root = os.path.abspath(self._get_default_output_dir()) | ||
|
|
||
| # 如果没有用户指定的输出路径,直接使用默认导出目录 | ||
| if not self._output_path: | ||
| return base_root | ||
|
|
||
| # 规范化用户提供的路径,防止 ".." 等路径片段 | ||
| user_path = os.path.normpath(self._output_path) | ||
|
|
||
| # 禁止绝对路径,以及以父目录开头的相对路径 | ||
| if os.path.isabs(user_path) or user_path.startswith(os.pardir + os.sep) or user_path == os.pardir: | ||
| raise SynthesisExportError("Invalid output path") | ||
|
|
||
| # 将用户路径视为 base_root 下的子目录 | ||
| candidate = os.path.abspath(os.path.join(base_root, user_path)) | ||
|
|
||
| # 再次确认结果路径仍在 base_root 下 | ||
| if not (candidate == base_root or candidate.startswith(base_root + os.sep)): | ||
| raise SynthesisExportError("Invalid output path") | ||
|
|
||
| return candidate | ||
|
|
||
| async def export_task_to_dataset( | ||
| self, | ||
| task_id: str, | ||
| @@ -148,7 +179,7 @@ | ||
| if not file_instances: | ||
| raise SynthesisExportError("No file instances to export") | ||
|
|
||
| output_dir = self._output_path or self._get_default_output_dir() | ||
| output_dir = self._get_safe_output_dir() | ||
| os.makedirs(output_dir, exist_ok=True) | ||
|
|
||
| file_paths: List[str] = [] |
| def _write_jsonl(self, path: str, records: Iterable[dict], format: Optional[str] = None) -> None: | ||
| """写入 JSONL 文件""" | ||
| fmt = format or self._format | ||
| os.makedirs(os.path.dirname(path), exist_ok=True) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 10 days ago
In general, to fix uncontrolled path usage you should (a) restrict writes to a known safe base directory and/or (b) normalize and validate any user-provided path segments before using them with filesystem APIs. For paths that may include subdirectories, the standard pattern is: choose a fixed base_dir, compute full_path = os.path.normpath(os.path.join(base_dir, user_path)), then ensure full_path is still under base_dir before creating directories or opening files.
For this codebase, the minimal, non‑breaking fix is:
-
Introduce a helper on
SynthesisDatasetExporterthat turnsself._output_path(which may be user-controlled) into a validated, absolute directory under a server-controlled root. We can:- Pick a safe root directory, e.g. the system temp directory returned by
tempfile.gettempdir()(consistent with_get_default_output_dir), and create a fixed subfolder such as"synthesis_exports". - If
self._output_pathis provided, treat it as a subdirectory or file name under that root, not as an absolute path. Normalize withos.path.normpathand check that the resulting resolved directory starts with the chosen root (prefix check on absolute paths). - If the check fails, raise
SynthesisExportError.
- Pick a safe root directory, e.g. the system temp directory returned by
-
Use this helper both where the directory is created (
export_data) and where_write_jsonlconstructs parent directories:- In
export_data, replaceoutput_dir = self._output_path or self._get_default_output_dir()with logic that computes a safeoutput_dirusing the helper. - In
_write_jsonl, derive the parent directory withos.path.dirname(path)and still callos.makedirs(..., exist_ok=True)but only afterpathhas been constructed with the validatedoutput_dir.
- In
-
Keep the external behaviour similar: callers can still pass
output_path, but it will now be interpreted safely (as a subpath under the export root) instead of an arbitrary filesystem location.
Concretely, all changes are limited to runtime/datamate-python/app/module/generation/service/export_service.py:
-
Add a new private method
_get_safe_output_dirnear_get_default_output_dirthat:- Imports
tempfilelocally (like_get_default_output_dir). - Defines
base_root = os.path.join(tempfile.gettempdir(), "synthesis_exports"). - If
self._output_pathis falsy, returnsbase_root. - Otherwise, builds
candidate = os.path.join(base_root, self._output_path), normalizes tonormalized = os.path.normpath(candidate), and ensuresos.path.commonpath([normalized, base_root]) == base_root. If not, raiseSynthesisExportError. - Returns
normalized.
- Imports
-
Modify
export_datasooutput_diris assigned by callingself._get_safe_output_dir()and thenos.makedirs(output_dir, exist_ok=True)remains.
No changes are needed in generation_api.py or to imports beyond using existing os and a local tempfile import inside _get_safe_output_dir (similar to _get_default_output_dir).
| @@ -148,7 +148,8 @@ | ||
| if not file_instances: | ||
| raise SynthesisExportError("No file instances to export") | ||
|
|
||
| output_dir = self._output_path or self._get_default_output_dir() | ||
| # 使用受控且经过校验的输出目录,防止目录穿越或任意路径写入 | ||
| output_dir = self._get_safe_output_dir() | ||
| os.makedirs(output_dir, exist_ok=True) | ||
|
|
||
| file_paths: List[str] = [] | ||
| @@ -265,6 +266,38 @@ | ||
| import tempfile | ||
| return tempfile.gettempdir() | ||
|
|
||
| def _get_safe_output_dir(self) -> str: | ||
| """ | ||
| 获取并校验导出输出目录,确保位于受控根目录下 | ||
|
|
||
| - 如果未指定 output_path,则使用系统临时目录下的固定子目录。 | ||
| - 如果指定了 output_path,则将其视为该根目录下的相对子路径,并进行归一化和越界检查。 | ||
| """ | ||
| import tempfile | ||
|
|
||
| # 受控的导出根目录,例如: /tmp/synthesis_exports | ||
| base_root = os.path.join(tempfile.gettempdir(), "synthesis_exports") | ||
|
|
||
| # 未指定 output_path 时,直接使用受控根目录 | ||
| if not self._output_path: | ||
| return base_root | ||
|
|
||
| # 将用户提供的 output_path 视为 base_root 下的相对路径 | ||
| candidate = os.path.join(base_root, self._output_path) | ||
| normalized = os.path.normpath(candidate) | ||
|
|
||
| # 防止目录穿越,确保归一化后的路径仍位于 base_root 下 | ||
| try: | ||
| common = os.path.commonpath([normalized, base_root]) | ||
| except ValueError: | ||
| # 不同驱动器等情况视为非法路径 | ||
| raise SynthesisExportError("Invalid output path") | ||
|
|
||
| if common != base_root: | ||
| raise SynthesisExportError("Output path is outside of allowed export directory") | ||
|
|
||
| return normalized | ||
|
|
||
| @staticmethod | ||
| def _ensure_dataset_path(dataset: Dataset) -> str: | ||
| """确保数据集路径存在""" |
No description provided.