Skip to content

fix: harden runtime and supply chain security#6116

Open
zouyonghe wants to merge 10 commits intoAstrBotDevs:masterfrom
zouyonghe:pr/security-hardening-upstream
Open

fix: harden runtime and supply chain security#6116
zouyonghe wants to merge 10 commits intoAstrBotDevs:masterfrom
zouyonghe:pr/security-hardening-upstream

Conversation

@zouyonghe
Copy link
Member

@zouyonghe zouyonghe commented Mar 12, 2026

Summary

This PR hardens the code paths that were confirmed as real security issues in the current scanner/review pass, while deliberately leaving false positives and compatibility-sensitive items unchanged.

What changed

Python backend

  • Parameterized the SQLite v3 stats and conversation queries in astrbot/core/db/migration/sqlite_v3.py and added regression coverage to ensure unsafe search text is only passed via bound parameters.
  • Kept local Python execution on a fixed interpreter (sys.executable) with shell=False in astrbot/core/computer/booters/local.py, plus an explicit audit note and tests.
  • Kept the docs upload script on argv-list subprocess execution with shell=False in docs/scripts/upload_doc_images_to_r2.py, plus an explicit audit note and tests.

WebSocket transport hardening

  • Added astrbot/core/platform/sources/websocket_security.py as a shared transport validator/converter.
  • Enforced secure transport for non-local Misskey and Satori endpoints, while still allowing localhost/private-network development endpoints.
  • Tightened the host check so bare hostnames like prod / api are no longer treated as local.
  • Hardened to_websocket_url() so unsupported schemes fail with a clear ValueError instead of relying on a mapping lookup.
  • Added positive and negative regression tests for the helper and StreamingClient.

T2I templates and frontend rendering

  • Replaced direct script-context interpolation with tojson in the T2I HTML templates.
  • Removed the old backtick escaping workaround in astrbot/core/utils/t2i/network_strategy.py because JSON serialization now handles that safely.
  • Sanitized or removed risky v-html usage in the dashboard/docs UI:
    • sanitize highlighted code output in IPythonToolBlock.vue
    • sanitize rendered inline markdown hints in AstrBotConfigV4.vue
    • replace dynamic HTML title rendering in Logo.vue
    • replace SVG-string v-html usage in docs/.vitepress/theme/components/ArticleShare.vue
  • Added tests covering template serialization and preserved markdown/backticks.

Dashboard dependency remediation

  • Upgraded and refreshed the dashboard dependency set to land patched versions for the reported advisories, including vite, axios, lodash, markdown-it, dompurify, and related transitive packages.
  • Kept the final lockfile in sync with the current manifest and added overrides where needed to constrain vulnerable transitive dependencies.

CI / workflow hygiene

  • Replaced mutable workflow refs such as @master with explicit released version tags across the touched GitHub Actions workflows.
  • The final workflow state intentionally uses version tags rather than 40-character SHAs.

Kubernetes runtime hardening

  • Kept the existing pod-level non-root settings already added in this branch.
  • Added seccompProfile: RuntimeDefault, allowPrivilegeEscalation: false, and capabilities.drop: [ALL] to the affected Kubernetes workloads.

Tests

  • Added targeted regression coverage in:
    • tests/unit/test_computer.py
    • tests/unit/test_sqlite_v3.py
    • tests/unit/test_t2i_security.py
    • tests/unit/test_websocket_security.py
    • docs/tests/test_upload_doc_images_to_r2.py

Why these changes

The goal of this PR is to fix the scanner findings that correspond to real exploit paths without widening the scope into unrelated product or deployment policy changes.

In particular:

  • raw SQL string construction was replaced with bound parameters because that is a true injection risk
  • unsafe HTML / script-context rendering paths were changed because they can become XSS sinks
  • non-local plaintext WebSocket endpoints are now rejected in runtime code because they expose real traffic confidentiality/integrity risk
  • workflow refs were moved off mutable branches because branch refs are weaker supply-chain controls than explicit releases
  • Kubernetes securityContext hardening was kept because it is runtime defense-in-depth and low-risk relative to the current manifests

False positives / intentional scope decisions

These items came up during scanning/review but are intentionally not changed in this PR:

  • Generic API key findings in defaults, tests, and docs: these are placeholder/default/example values rather than live secrets committed to the repository.
  • NapCat docs ws:// examples: the temporary doc rewrite was reverted. Those examples document local or same-network setups and are not the runtime security boundary. The runtime code changes for Misskey/Satori remain in place.
  • subprocess.run findings in local.py and upload_doc_images_to_r2.py: these calls are audited argv-list invocations with shell=False, not shell-string command construction. This PR keeps the safe calling pattern and records the audit intent inline.
  • Dockerfile non-root runtime: that change was reverted for compatibility with existing container deployments and mounted data directories.
  • Image latest tags in Kubernetes: this PR does not change image mutability policy. The branch keeps the current latest tags by request.
  • Full SHA pinning for GitHub Actions: the branch now uses explicit release tags instead of mutable branches, but not 40-character commit SHAs.

Test Plan

  • uv run pytest tests/unit/test_computer.py tests/unit/test_sqlite_v3.py tests/unit/test_t2i_security.py tests/unit/test_websocket_security.py docs/tests/test_upload_doc_images_to_r2.py
  • uv run ruff check astrbot/core/computer/booters/local.py astrbot/core/db/migration/sqlite_v3.py astrbot/core/platform/sources/websocket_security.py astrbot/core/platform/sources/misskey/misskey_api.py astrbot/core/platform/sources/satori/satori_adapter.py docs/scripts/upload_doc_images_to_r2.py tests/unit/test_sqlite_v3.py tests/unit/test_t2i_security.py tests/unit/test_websocket_security.py
  • uv run ruff format --check .
  • pnpm build (in dashboard)
  • pnpm docs:build (in docs)
  • targeted pnpm audit checks for the remediated dashboard advisories

Not run

  • docker build -f Dockerfile . (local Docker daemon unavailable in this environment)

由 Sourcery 提供的摘要

在加强运行时、传输层和前端安全性的同时,进一步收紧 CI、Kubernetes 与依赖项配置。

Bug 修复:

  • 在 SQLite 统计与会话查询中,消除原始 SQL 字符串拼接,改用绑定参数以防止注入。
  • 通过校验协议并强制使用安全传输,防止 Misskey 和 Satori 使用不安全或明文的远程 WebSocket 连接。
  • 移除 T2I 模板和仪表盘组件中不安全的脚本上下文与 HTML 渲染,这些位置可能成为 XSS 注入点。
  • 确保本地 Python 执行和文档上传脚本在调用子进程时使用固定的可执行文件路径和 argv 参数列表,并禁用 shell,从而避免命令注入。

增强:

  • 引入共享的 WebSocket URL 校验与转换辅助函数,以在各平台集中统一地强制安全传输。
  • 优化仪表盘 Logo 渲染和文档分享按钮的标记结构,在保留布局行为的同时避免动态 HTML 注入。
  • 通过设置非 root Pod、使用 RuntimeDefault seccomp 策略、禁用权限提升以及丢弃多余能力,增强 Kubernetes 工作负载的安全性。

构建:

  • 更新仪表盘依赖(包括 vite、axios、lodash、markdown-it、dompurify 及相关传递依赖),以引入针对已知安全公告的修复版本。
  • 添加 pnpm overrides,限制如 immutable 和 lodash-es 等存在漏洞的传递依赖版本范围。

CI:

  • 将 GitHub Actions 工作流和与 Docker 相关的 actions 固定到明确发布的版本标签,而不再使用 master 或主版本等可变分支标签。

测试:

  • 新增回归测试,覆盖本地 Python 子进程执行、SQLite 查询参数化、T2I 模板序列化、WebSocket URL 安全性以及文档图片上传脚本。
Original summary in English

Summary by Sourcery

在强化运行时、传输层和前端安全性的同时,收紧 CI、Kubernetes 和仪表盘依赖配置。

Bug 修复:

  • 通过使用绑定参数和命名占位符,防止 SQLite 统计和会话查询中的 SQL 注入。
  • 对非本地的 Misskey 和 Satori 端点强制使用安全的 WebSocket 传输,并拒绝不受支持或不安全的远程 URL。
  • 消除 T2I 模板和仪表盘组件中不安全的 HTML/脚本上下文渲染,避免形成 XSS 漏洞入口。
  • 确保本地 Python 执行和文档图片上传脚本使用固定解释器和 argv 列表形式的 subprocess 调用,并禁用 shell,从而避免命令注入。

增强:

  • 引入共享的 WebSocket 安全助手,用于验证/转换 URL,并集中管理传输加固逻辑。
  • 优化仪表盘的 logo 和文档分享组件,在保留布局和行为的前提下避免动态 HTML 注入。
  • 通过在 Pod 和容器层面设置 securityContext(包括 RuntimeDefault seccomp 和移除多余 capabilities)来加强 Kubernetes 工作负载的安全性。

构建:

  • 更新仪表盘依赖(包括 vite、axios、lodash、markdown-it、dompurify 及其传递依赖),并添加 pnpm overrides 以修复已报告的安全问题。

CI:

  • 将 GitHub Actions 和与 Docker 相关的工作流固定到明确发布的版本标签,而不是可变分支。

测试:

  • 新增回归测试,用于覆盖 SQLite 查询参数化、WebSocket URL 安全性、T2I 模板序列化、本地 Python 子进程执行以及文档图片上传脚本。
Original summary in English

Summary by Sourcery

Harden runtime, transport, and frontend security while tightening CI, Kubernetes, and dashboard dependency configurations.

Bug Fixes:

  • Prevent SQL injection in SQLite stats and conversation queries by using bound parameters and named placeholders.
  • Enforce secure WebSocket transport for non-local Misskey and Satori endpoints and reject unsupported or insecure remote URLs.
  • Eliminate unsafe HTML/script-context rendering in T2I templates and dashboard components that could act as XSS sinks.
  • Ensure local Python execution and docs image upload scripts use a fixed interpreter and argv-list subprocess invocation with shell disabled to avoid command injection.

Enhancements:

  • Introduce a shared WebSocket security helper to validate/convert URLs and centralize transport hardening logic.
  • Refine dashboard logo and docs sharing components to avoid dynamic HTML injection while preserving layout and behavior.
  • Strengthen Kubernetes workloads with pod- and container-level securityContext settings, including RuntimeDefault seccomp and dropped capabilities.

Build:

  • Refresh dashboard dependencies (including vite, axios, lodash, markdown-it, dompurify and transitive packages) and add pnpm overrides to remediate reported advisories.

CI:

  • Pin GitHub Actions and Docker-related workflows to explicit released version tags instead of mutable branches.

Tests:

  • Add regression tests for SQLite query parameterization, WebSocket URL security, T2I template serialization, local Python subprocess execution, and the docs image upload script.

@auto-assign auto-assign bot requested review from Soulter and advent259141 March 12, 2026 09:52
@dosubot dosubot bot added the size:XL This PR changes 500-999 lines, ignoring generated files. label Mar 12, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the overall security posture of the application by addressing various runtime, supply chain, and code-level vulnerabilities. It introduces stricter security configurations for container deployments, enforces secure communication protocols for external WebSocket connections, and eliminates potential injection risks in Python code and frontend rendering. Additionally, it updates several third-party dependencies to their latest secure versions and adds dedicated tests to prevent regressions in these critical security areas.

Highlights

  • Runtime and Kubernetes Security Hardening: Implemented stricter security contexts for Docker and Kubernetes deployments, including running as a non-root user and dropping unnecessary capabilities.
  • Secure WebSocket Transport Enforcement: Introduced a new module to enforce secure transport (WSS/HTTPS) for non-local WebSocket endpoints, preventing insecure communication.
  • Python Subprocess and SQL Query Security: Eliminated unsafe subprocess execution patterns by explicitly disabling shell and ensuring SQL queries are properly parameterized to prevent injection attacks.
  • Frontend Rendering Security Enhancements: Addressed potential Cross-Site Scripting (XSS) vulnerabilities in Jinja and Vue rendering paths by using JSON serialization and DOM sanitization.
  • Dependency Updates: Upgraded several vulnerable dashboard dependencies to their latest secure versions.
  • New Security Regression Tests: Added dedicated unit tests to cover the newly implemented security-sensitive paths, ensuring ongoing protection against regressions.
Changelog
  • Dockerfile
    • Configured Docker image to run as a non-root user and added user/group management.
    • Switched the default user to 'astrbot' for running the application.
  • astrbot/core/computer/booters/local.py
    • Updated subprocess execution to explicitly use 'sys.executable' and disable shell for security.
  • astrbot/core/db/migration/sqlite_v3.py
    • Refactored SQL queries to use named parameters, preventing SQL injection.
  • astrbot/core/platform/sources/misskey/misskey_api.py
    • Integrated secure transport validation for Misskey WebSocket connections.
  • astrbot/core/platform/sources/satori/satori_adapter.py
    • Integrated secure transport validation for Satori WebSocket connections.
  • astrbot/core/platform/sources/websocket_security.py
    • Added new module for validating and transforming WebSocket URLs for secure transport.
  • astrbot/core/utils/t2i/network_strategy.py
    • Removed unnecessary backtick escaping in text-to-image rendering.
  • astrbot/core/utils/t2i/template/astrbot_powershell.html
    • Updated Jinja template to use 'tojson' filter for safe text rendering.
  • astrbot/core/utils/t2i/template/base.html
    • Updated Jinja template to use 'tojson' filter for safe text rendering.
  • dashboard/package.json
    • Updated multiple frontend dependencies and added pnpm overrides for specific packages.
  • dashboard/pnpm-lock.yaml
    • Updated locked versions of various dependencies and added pnpm overrides.
  • dashboard/src/components/chat/message_list_comps/IPythonToolBlock.vue
    • Imported 'DOMPurify' and applied sanitization to rendered code and results.
  • dashboard/src/components/shared/AstrBotConfigV4.vue
    • Imported 'DOMPurify' and applied sanitization to rendered markdown hints.
  • dashboard/src/components/shared/Logo.vue
    • Refactored logo title rendering to use computed properties and avoid 'v-html' for security.
  • docs/.vitepress/theme/components/ArticleShare.vue
    • Removed unused import and inlined SVG icons to enhance security and simplify rendering.
  • docs/en/platform/aiocqhttp/napcat.md
    • Updated documentation to recommend secure WebSocket connections (wss) for public deployments.
  • docs/scripts/upload_doc_images_to_r2.py
    • Modified 'subprocess.run' calls to explicitly disable shell execution.
  • docs/tests/test_upload_doc_images_to_r2.py
    • Added a new unit test to verify 'subprocess.run' is called with 'shell=False'.
  • docs/zh/platform/aiocqhttp/napcat.md
    • Updated Chinese documentation to recommend secure WebSocket connections (wss) for public deployments.
  • k8s/astrbot/02-deployment.yaml
    • Applied hardened security contexts for the pod and container.
  • k8s/astrbot_with_napcat/02-deployment.yaml
    • Applied hardened security contexts for the pod and containers, and pinned the busybox image version.
  • tests/unit/test_computer.py
    • Added a test to confirm subprocess execution uses a fixed Python executable and 'shell=False'.
  • tests/unit/test_sqlite_v3.py
    • Added new unit tests to verify SQL query parameterization.
  • tests/unit/test_t2i_security.py
    • Added new unit tests to verify T2I template security.
  • tests/unit/test_websocket_security.py
    • Added new unit tests for WebSocket security functions.
Ignored Files
  • Ignored by pattern: .github/workflows/** (4)
    • .github/workflows/build-docs.yml
    • .github/workflows/dashboard_ci.yml
    • .github/workflows/docker-image.yml
    • .github/workflows/release.yml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added area:core The bug / feature is about astrbot's core, backend area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. labels Mar 12, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 6 个安全问题和 1 个其他问题,并给出了一些总体反馈:

安全问题

  • 检测到未使用静态字符串的子进程函数 run。如果该数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用场景,确保其不会被外部资源控制。你可以考虑使用 shlex.escape()。(link)
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。(link)
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。(link)
  • 检测到未使用静态字符串的子进程函数 run。如果该数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用场景,确保其不会被外部资源控制。你可以考虑使用 shlex.escape()。(link)
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。(link)
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。(link)

总体评论

  • _is_local_or_private_host 这个辅助函数会将所有不包含点号的主机名(例如 exampleprod)视为本地主机,这会削弱对 ws:// / http:// 的限制;建议收紧逻辑,只接受明确的 localhost/回环地址、内网 IP,或者一个定义清晰的主机名后缀允许列表。
  • IPythonToolBlock.vue 中,后备路径上同时进行手动 escapeHtmlDOMPurify.sanitize 是重复的;你可以简化为仅依赖 DOMPurify(直接传入原始的 code.value),或者保留手动转义但省略第二次 DOMPurify 清理步骤。
给 AI 代理的提示
请根据本次代码评审中的评论进行修改:

## 总体评论
- `_is_local_or_private_host` 这个辅助函数会将所有不包含点号的主机名(例如 `example``prod`)视为本地主机,这会削弱对 `ws://` / `http://` 的限制;建议收紧逻辑,只接受明确的 localhost/回环地址、内网 IP,或者一个定义清晰的主机名后缀允许列表。
-`IPythonToolBlock.vue` 中,后备路径上同时进行手动 `escapeHtml``DOMPurify.sanitize` 是重复的;你可以简化为仅依赖 DOMPurify(直接传入原始的 `code.value`),或者保留手动转义但省略第二次 DOMPurify 清理步骤。

## 单条评论

### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="11-13" />
<code_context>
+_ALLOWED_INSECURE_SUFFIXES = (".local", ".internal")
+
+
+def _is_local_or_private_host(hostname: str | None) -> bool:
+    if not hostname:
+        return False
+
+    normalized = hostname.strip("[]").lower()
+    if normalized == "localhost" or "." not in normalized:
+        return True
+    if normalized.endswith(_ALLOWED_INSECURE_SUFFIXES):
</code_context>
<issue_to_address>
**🚨 suggestion (security):** 请重新考虑将所有不带点号的主机名都视为本地。

`normalized == "localhost" or "." not in normalized` 会将任何裸主机名(例如 `"prod"``"api"`)视为本地,因此 `http://prod``ws://api` 可能会绕过必须使用安全传输的要求。在很多环境中,这类短主机名其实是真正的生产端点。建议把范围收缩到明确的 localhost 名称(`localhost``127.0.0.1``::1`)和私有 IP 网段,并删除 `"." not in normalized` 这个启发式逻辑。

```suggestion
    normalized = hostname.strip("[]").lower()
    if normalized == "localhost":
        return True
```
</issue_to_address>

### Comment 2
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到未使用静态字符串的子进程函数 `run`。如果该数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用场景,确保其不会被外部资源控制。你可以考虑使用 `shlex.escape()`*Source: opengrep*
</issue_to_address>

### Comment 3
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="120" />
<code_context>
- For local deployment, fill in `URL` with `ws://HostIP:Port/ws`. For example, `ws://localhost:6199/ws` or `ws://127.0.0.1:6199/ws`.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。

*Source: opengrep*
</issue_to_address>

### Comment 4
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="124" />
<code_context>
> 2. For cross-host or public network deployment, do not expose plain `ws://`. Put AstrBot behind an HTTPS/TLS reverse proxy and connect with `wss://your-domain/ws` instead.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。

*Source: opengrep*
</issue_to_address>

### Comment 5
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到未使用静态字符串的子进程函数 `run`。如果该数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用场景,确保其不会被外部资源控制。你可以考虑使用 `shlex.escape()`*Source: opengrep*
</issue_to_address>

### Comment 6
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="113" />
<code_context>
- 本机部署时,`URL` 填写 `ws://宿主机IP:端口/ws`。如 `ws://localhost:6199/ws``ws://127.0.0.1:6199/ws`。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。

*Source: opengrep*
</issue_to_address>

### Comment 7
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="117" />
<code_context>
> 2. 如果是跨主机或公网部署,请不要暴露明文 `ws://`。应通过 HTTPS/TLS 反向代理暴露 AstrBot,并使用 `wss://你的域名/ws` 连接。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。

*Source: opengrep*
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得这些评审有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English

Hey - I've found 6 security issues, 1 other issue, and left some high level feedback:

Security issues:

  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)

General comments:

  • The _is_local_or_private_host helper treats any hostname without a dot (e.g. example, prod) as local, which weakens the ws:///http:// restriction; consider tightening this to only accept explicit localhost/loopback, private IPs, or a clearly delimited allowlist of host suffixes.
  • In IPythonToolBlock.vue, the manual escapeHtml plus DOMPurify.sanitize on the fallback path is redundant; you can simplify by either relying solely on DOMPurify (passing raw code.value) or keeping the manual escaping but skipping the second sanitization step.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The `_is_local_or_private_host` helper treats any hostname without a dot (e.g. `example`, `prod`) as local, which weakens the `ws://`/`http://` restriction; consider tightening this to only accept explicit localhost/loopback, private IPs, or a clearly delimited allowlist of host suffixes.
- In `IPythonToolBlock.vue`, the manual `escapeHtml` plus `DOMPurify.sanitize` on the fallback path is redundant; you can simplify by either relying solely on DOMPurify (passing raw `code.value`) or keeping the manual escaping but skipping the second sanitization step.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="11-13" />
<code_context>
+_ALLOWED_INSECURE_SUFFIXES = (".local", ".internal")
+
+
+def _is_local_or_private_host(hostname: str | None) -> bool:
+    if not hostname:
+        return False
+
+    normalized = hostname.strip("[]").lower()
+    if normalized == "localhost" or "." not in normalized:
+        return True
+    if normalized.endswith(_ALLOWED_INSECURE_SUFFIXES):
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Reconsider treating any hostname without a dot as local-only.

`normalized == "localhost" or "." not in normalized` treats any bare hostname (e.g. `"prod"`, `"api"`) as local, so `http://prod` or `ws://api` could skip the secure-transport requirement. In many setups, such short names are real production endpoints. Consider restricting this to explicit localhost names (`localhost`, `127.0.0.1`, `::1`) and private IP ranges, and removing the `"." not in normalized` heuristic.

```suggestion
    normalized = hostname.strip("[]").lower()
    if normalized == "localhost":
        return True
```
</issue_to_address>

### Comment 2
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 3
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="120" />
<code_context>
- For local deployment, fill in `URL` with `ws://HostIP:Port/ws`. For example, `ws://localhost:6199/ws` or `ws://127.0.0.1:6199/ws`.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 4
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="124" />
<code_context>
> 2. For cross-host or public network deployment, do not expose plain `ws://`. Put AstrBot behind an HTTPS/TLS reverse proxy and connect with `wss://your-domain/ws` instead.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 5
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 6
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="113" />
<code_context>
- 本机部署时,`URL` 填写 `ws://宿主机IP:端口/ws`。如 `ws://localhost:6199/ws``ws://127.0.0.1:6199/ws`。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 7
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="117" />
<code_context>
> 2. 如果是跨主机或公网部署,请不要暴露明文 `ws://`。应通过 HTTPS/TLS 反向代理暴露 AstrBot,并使用 `wss://你的域名/ws` 连接。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

本次 PR 对整个项目进行了全面的安全加固,覆盖了后端、前端、部署等多个方面,质量非常高。主要改进包括:

  • 防止 SQL 注入:通过在所有数据库查询中使用绑定参数,彻底修复了不安全的 SQL 拼接问题。
  • 防范 XSS 攻击:在 Vue 组件和 Jinja2 模板中,移除了不安全的 v-html| safe 过滤器,并引入 DOMPurify| tojson 进行输出编码和净化,有效防止了跨站脚本攻击。
  • 强化运行时安全:为 Docker 容器和 Kubernetes Pod 添加了非 root 用户执行及严格的 securityContext 配置,遵循了最小权限原则。
  • 安全的子进程调用:修复了 Python 子进程调用,禁用了 shell 执行并避免了基于环境变量的命令覆盖风险。
  • 强制安全传输:对外部 WebSocket 连接强制使用 wss://,并增加了对本地非加密连接的兼容。
  • 供应链安全:更新了多个存在漏洞的前端依赖,并锁定了 CI 中使用的第三方 GitHub Actions 版本。
  • 全面的回归测试:为所有安全相关的修复都添加了对应的单元测试,确保了修复的有效性和长期稳定性。

整体而言,这是一次非常出色和彻底的安全重构,代码实现清晰、规范,显著提升了项目的整体安全性。没有发现需要修改的问题。

@zouyonghe
Copy link
Member Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 6 个安全问题、3 个其他问题,并留下了一些高层次的反馈:

安全问题

  • 发现对 subprocess 函数 run 的调用没有使用静态字符串。如果这些数据可能被恶意行为者控制,这可能会导致命令注入。请审计该调用的使用,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()。(链接
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应该使用加密的 WebSocket(wss)。(链接
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应该使用加密的 WebSocket(wss)。(链接
  • 发现对 subprocess 函数 run 的调用没有使用静态字符串。如果这些数据可能被恶意行为者控制,这可能会导致命令注入。请审计该调用的使用,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()。(链接
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应该使用加密的 WebSocket(wss)。(链接
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应该使用加密的 WebSocket(wss)。(链接

总体意见

  • to_websocket_url 中,建议对输入的 scheme 进行校验并抛出清晰的错误(而不是依赖 scheme_map 查找),这样像 ftp:// 这样的意外 scheme 会以明确的错误信息失败,而不是触发 KeyError
  • Dockerfile 中和 Kubernetes securityContext 中硬编码的 UID/GID 1000 未来可能会发生偏移;建议把这些值集中管理,或者添加简短注释,确保镜像用户与 Pod 的 runAsUser/fsGroup 保持同步。
给 AI Agents 的提示词
Please address the comments from this code review:

## Overall Comments
- In `to_websocket_url`, consider validating the input scheme and raising a clear error (rather than relying on the scheme_map lookup) so unexpected schemes like `ftp://` fail with an explicit message instead of a `KeyError`.
- The hard-coded UID/GID 1000 in the Dockerfile and the Kubernetes `securityContext` may drift over time; consider centralizing these values or adding a brief comment to keep the image user and pod `runAsUser`/`fsGroup` in sync.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="46-54" />
<code_context>
+    return parsed
+
+
+def to_websocket_url(url: str) -> str:
+    parsed = urlsplit(url.rstrip("/"))
+    scheme_map = {
+        "http": "ws",
+        "https": "wss",
+        "ws": "ws",
+        "wss": "wss",
+    }
+    return urlunsplit(
+        parsed._replace(scheme=scheme_map[parsed.scheme]),
+    )
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Consider integrating scheme validation into `to_websocket_url` or narrowing its visibility to avoid unsafe future usages.

This helper currently trusts callers and will convert `http://example.com` to `ws://example.com`. In this PR, all callers validate first, but a future caller could use it on unvalidated input and bypass the secure-transport requirement.

To harden this:
- Either have `to_websocket_url` enforce the scheme itself (e.g. call `require_secure_transport_url` or assert an allowed scheme), or
- Make it clearly internal (e.g. `_to_websocket_url`) or otherwise document that it must only be used with pre-validated URLs.

This reduces the chance of accidentally weakening the security guarantees later.

Suggested implementation:

```python
    return parsed


def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str:
    # Enforce secure-transport requirements before converting schemes.
    parsed = require_secure_transport_url(url, label=label)

    scheme_map = {
        "http": "ws",
        "https": "wss",
        "ws": "ws",
        "wss": "wss",
    }

    try:
        ws_scheme = scheme_map[parsed.scheme]
    except KeyError as exc:
        raise ValueError(
            f"{label} must use http://, https://, ws:// or wss://: {url}",
        ) from exc

    return urlunsplit(parsed._replace(scheme=ws_scheme))

```

1. Ensure the validator used above is actually named `require_secure_transport_url` and is imported/in-scope in this module. If it has a different name or signature, adjust the call accordingly.
2. Since `to_websocket_url` now has an optional `label` keyword argument, callers that want custom error messages can pass it; existing callers will continue to work without changes, but if your codebase prefers not to expose `label` here, you can drop it and hard-code the label string inside the function.
</issue_to_address>

### Comment 2
<location path="tests/unit/test_websocket_security.py" line_range="54-63" />
<code_context>
 class TestLocalPythonComponent:
     """Tests for LocalPythonComponent."""

+    @pytest.mark.asyncio
+    async def test_exec_uses_fixed_python_executable(self):
+        """Test Python execution ignores dynamic executable overrides."""
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a positive-path StreamingClient test to assert that secure/local URLs actually connect using the transformed WebSocket URL.

The existing `test_streaming_client_rejects_remote_http_instance` covers the rejection path for insecure remote HTTP URLs. To fully exercise `require_secure_transport_url`, `to_websocket_url`, and `StreamingClient.connect`, please also add a positive-path test that:

- Uses a local or private URL (e.g. `http://localhost:8080` or `https://example.com`),
- Patches `websockets.connect` and asserts it is awaited, and
- Verifies the URL passed to `websockets.connect` matches the expected `ws://`/`wss://` value from `to_websocket_url`.
</issue_to_address>

### Comment 3
<location path="tests/unit/test_t2i_security.py" line_range="23-32" />
<code_context>
+    assert return_url is False
+
+
+def test_t2i_templates_use_json_serialization_for_text():
+    template_paths = [
+        Path("astrbot/core/utils/t2i/template/base.html"),
+        Path("astrbot/core/utils/t2i/template/astrbot_powershell.html"),
+    ]
+
+    for template_path in template_paths:
+        content = template_path.read_text(encoding="utf-8")
+        assert "text | safe" not in content
+        assert "text | tojson" in content
</code_context>
<issue_to_address>
**suggestion (testing):** Consider discovering all T2I templates dynamically instead of hardcoding a fixed list of template paths in the test.

This test enforces the key invariant that T2I templates use `text | tojson` instead of `text | safe`, but it only checks two specific files. Please update it to enumerate all `*.html` files under `astrbot/core/utils/t2i/template/` (e.g. via `Path(.../template).glob('*.html')`) and apply the same assertions to each one so new templates are automatically covered and security regressions are caught.
</issue_to_address>

### Comment 4
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 5
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="120" />
<code_context>
- For local deployment, fill in `URL` with `ws://HostIP:Port/ws`. For example, `ws://localhost:6199/ws` or `ws://127.0.0.1:6199/ws`.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 6
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="124" />
<code_context>
> 2. For cross-host or public network deployment, do not expose plain `ws://`. Put AstrBot behind an HTTPS/TLS reverse proxy and connect with `wss://your-domain/ws` instead.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 7
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 8
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="113" />
<code_context>
- 本机部署时,`URL` 填写 `ws://宿主机IP:端口/ws`。如 `ws://localhost:6199/ws``ws://127.0.0.1:6199/ws`。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 9
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="117" />
<code_context>
> 2. 如果是跨主机或公网部署,请不要暴露明文 `ws://`。应通过 HTTPS/TLS 反向代理暴露 AstrBot,并使用 `wss://你的域名/ws` 连接。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

Sourcery 对开源项目是免费的 —— 如果你觉得这次 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的 Review。
Original comment in English

Hey - I've found 6 security issues, 3 other issues, and left some high level feedback:

Security issues:

  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)

General comments:

  • In to_websocket_url, consider validating the input scheme and raising a clear error (rather than relying on the scheme_map lookup) so unexpected schemes like ftp:// fail with an explicit message instead of a KeyError.
  • The hard-coded UID/GID 1000 in the Dockerfile and the Kubernetes securityContext may drift over time; consider centralizing these values or adding a brief comment to keep the image user and pod runAsUser/fsGroup in sync.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `to_websocket_url`, consider validating the input scheme and raising a clear error (rather than relying on the scheme_map lookup) so unexpected schemes like `ftp://` fail with an explicit message instead of a `KeyError`.
- The hard-coded UID/GID 1000 in the Dockerfile and the Kubernetes `securityContext` may drift over time; consider centralizing these values or adding a brief comment to keep the image user and pod `runAsUser`/`fsGroup` in sync.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="46-54" />
<code_context>
+    return parsed
+
+
+def to_websocket_url(url: str) -> str:
+    parsed = urlsplit(url.rstrip("/"))
+    scheme_map = {
+        "http": "ws",
+        "https": "wss",
+        "ws": "ws",
+        "wss": "wss",
+    }
+    return urlunsplit(
+        parsed._replace(scheme=scheme_map[parsed.scheme]),
+    )
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Consider integrating scheme validation into `to_websocket_url` or narrowing its visibility to avoid unsafe future usages.

This helper currently trusts callers and will convert `http://example.com` to `ws://example.com`. In this PR, all callers validate first, but a future caller could use it on unvalidated input and bypass the secure-transport requirement.

To harden this:
- Either have `to_websocket_url` enforce the scheme itself (e.g. call `require_secure_transport_url` or assert an allowed scheme), or
- Make it clearly internal (e.g. `_to_websocket_url`) or otherwise document that it must only be used with pre-validated URLs.

This reduces the chance of accidentally weakening the security guarantees later.

Suggested implementation:

```python
    return parsed


def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str:
    # Enforce secure-transport requirements before converting schemes.
    parsed = require_secure_transport_url(url, label=label)

    scheme_map = {
        "http": "ws",
        "https": "wss",
        "ws": "ws",
        "wss": "wss",
    }

    try:
        ws_scheme = scheme_map[parsed.scheme]
    except KeyError as exc:
        raise ValueError(
            f"{label} must use http://, https://, ws:// or wss://: {url}",
        ) from exc

    return urlunsplit(parsed._replace(scheme=ws_scheme))

```

1. Ensure the validator used above is actually named `require_secure_transport_url` and is imported/in-scope in this module. If it has a different name or signature, adjust the call accordingly.
2. Since `to_websocket_url` now has an optional `label` keyword argument, callers that want custom error messages can pass it; existing callers will continue to work without changes, but if your codebase prefers not to expose `label` here, you can drop it and hard-code the label string inside the function.
</issue_to_address>

### Comment 2
<location path="tests/unit/test_websocket_security.py" line_range="54-63" />
<code_context>
 class TestLocalPythonComponent:
     """Tests for LocalPythonComponent."""

+    @pytest.mark.asyncio
+    async def test_exec_uses_fixed_python_executable(self):
+        """Test Python execution ignores dynamic executable overrides."""
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a positive-path StreamingClient test to assert that secure/local URLs actually connect using the transformed WebSocket URL.

The existing `test_streaming_client_rejects_remote_http_instance` covers the rejection path for insecure remote HTTP URLs. To fully exercise `require_secure_transport_url`, `to_websocket_url`, and `StreamingClient.connect`, please also add a positive-path test that:

- Uses a local or private URL (e.g. `http://localhost:8080` or `https://example.com`),
- Patches `websockets.connect` and asserts it is awaited, and
- Verifies the URL passed to `websockets.connect` matches the expected `ws://`/`wss://` value from `to_websocket_url`.
</issue_to_address>

### Comment 3
<location path="tests/unit/test_t2i_security.py" line_range="23-32" />
<code_context>
+    assert return_url is False
+
+
+def test_t2i_templates_use_json_serialization_for_text():
+    template_paths = [
+        Path("astrbot/core/utils/t2i/template/base.html"),
+        Path("astrbot/core/utils/t2i/template/astrbot_powershell.html"),
+    ]
+
+    for template_path in template_paths:
+        content = template_path.read_text(encoding="utf-8")
+        assert "text | safe" not in content
+        assert "text | tojson" in content
</code_context>
<issue_to_address>
**suggestion (testing):** Consider discovering all T2I templates dynamically instead of hardcoding a fixed list of template paths in the test.

This test enforces the key invariant that T2I templates use `text | tojson` instead of `text | safe`, but it only checks two specific files. Please update it to enumerate all `*.html` files under `astrbot/core/utils/t2i/template/` (e.g. via `Path(.../template).glob('*.html')`) and apply the same assertions to each one so new templates are automatically covered and security regressions are caught.
</issue_to_address>

### Comment 4
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 5
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="120" />
<code_context>
- For local deployment, fill in `URL` with `ws://HostIP:Port/ws`. For example, `ws://localhost:6199/ws` or `ws://127.0.0.1:6199/ws`.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 6
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="124" />
<code_context>
> 2. For cross-host or public network deployment, do not expose plain `ws://`. Put AstrBot behind an HTTPS/TLS reverse proxy and connect with `wss://your-domain/ws` instead.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 7
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 8
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="113" />
<code_context>
- 本机部署时,`URL` 填写 `ws://宿主机IP:端口/ws`。如 `ws://localhost:6199/ws``ws://127.0.0.1:6199/ws`。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 9
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="117" />
<code_context>
> 2. 如果是跨主机或公网部署,请不要暴露明文 `ws://`。应通过 HTTPS/TLS 反向代理暴露 AstrBot,并使用 `wss://你的域名/ws` 连接。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +54 to +63
@pytest.mark.asyncio
async def test_streaming_client_rejects_remote_http_instance() -> None:
client = StreamingClient("http://example.com", "token")

with patch(
"astrbot.core.platform.sources.misskey.misskey_api.websockets.connect",
new_callable=AsyncMock,
) as mock_connect:
assert await client.connect() is False

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 建议添加一个 StreamingClient 的“正向路径”测试,用于验证安全/本地 URL 能够实际使用转换后的 WebSocket URL 建立连接。

当前的 test_streaming_client_rejects_remote_http_instance 已经覆盖了对不安全远程 HTTP URL 的拒绝路径。为了完整测试 require_secure_transport_urlto_websocket_urlStreamingClient.connect,请再添加一个正向路径测试,要求:

  • 使用本地或私有 URL(例如 http://localhost:8080https://example.com),
  • websockets.connect 做 patch,并断言其被 await 调用,
  • 验证传给 websockets.connect 的 URL 与 to_websocket_url 生成的期望 ws:///wss:// 值一致。
Original comment in English

suggestion (testing): Consider adding a positive-path StreamingClient test to assert that secure/local URLs actually connect using the transformed WebSocket URL.

The existing test_streaming_client_rejects_remote_http_instance covers the rejection path for insecure remote HTTP URLs. To fully exercise require_secure_transport_url, to_websocket_url, and StreamingClient.connect, please also add a positive-path test that:

  • Uses a local or private URL (e.g. http://localhost:8080 or https://example.com),
  • Patches websockets.connect and asserts it is awaited, and
  • Verifies the URL passed to websockets.connect matches the expected ws:///wss:// value from to_websocket_url.

Comment on lines +23 to +32
def test_t2i_templates_use_json_serialization_for_text():
template_paths = [
Path("astrbot/core/utils/t2i/template/base.html"),
Path("astrbot/core/utils/t2i/template/astrbot_powershell.html"),
]

for template_path in template_paths:
content = template_path.read_text(encoding="utf-8")
assert "text | safe" not in content
assert "text | tojson" in content
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 建议在测试中动态发现所有 T2I 模板,而不是硬编码固定的模板路径列表。

这个测试用于保证一个关键不变式:T2I 模板使用的是 text | tojson 而不是 text | safe,但它目前只检查了两个特定文件。请更新测试逻辑,枚举 astrbot/core/utils/t2i/template/ 目录下所有 *.html 文件(例如通过 Path(.../template).glob('*.html')),并对每个文件应用同样的断言,这样新模板会自动被覆盖,从而更好地防止安全回归。

Original comment in English

suggestion (testing): Consider discovering all T2I templates dynamically instead of hardcoding a fixed list of template paths in the test.

This test enforces the key invariant that T2I templates use text | tojson instead of text | safe, but it only checks two specific files. Please update it to enumerate all *.html files under astrbot/core/utils/t2i/template/ (e.g. via Path(.../template).glob('*.html')) and apply the same assertions to each one so new templates are automatically covered and security regressions are caught.

Comment on lines 114 to 120
result = subprocess.run(
[os.environ.get("PYTHON", sys.executable), "-c", code],
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对 subprocess 函数 run 的调用没有使用静态字符串。如果这些数据可能被恶意行为者控制,这可能会导致命令注入。请审计该调用的使用,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()

Source: opengrep

Original comment in English

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New security issues found

@zouyonghe
Copy link
Member Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 4 个安全问题,并给出了一些整体性的反馈:

安全问题

  • 检测到对子进程函数 run 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 shlex.escape()。(link)
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket(wss)。(link)
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket(wss)。(link)
  • 检测到对子进程函数 run 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 shlex.escape()。(link)

总体评论

  • SQLiteDatabase.get_filtered_conversations 中,data_sql 会把 "FROM webchat_conversation" 直接与 where_sql 拼接(而 where_sql 已经不再包含前导空格),从而生成 FROM webchat_conversationWHERE ...。请在 WHERE 之前显式添加一个空格(例如 "FROM webchat_conversation " + where_sql),以避免生成无效的 SQL。
给 AI Agent 的提示
请根据本次代码评审中的评论进行修改:

## Overall Comments
-`SQLiteDatabase.get_filtered_conversations` 中,`data_sql` 会把 `"FROM webchat_conversation"` 直接与 `where_sql` 拼接(而 `where_sql` 已经不再包含前导空格),从而生成 `FROM webchat_conversationWHERE ...`。请在 `WHERE` 之前显式添加一个空格(例如 `"FROM webchat_conversation " + where_sql`),以避免生成无效的 SQL。

## Individual Comments

### Comment 1
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对子进程函数 `run` 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 `shlex.escape()`*来源: opengrep*
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="53" />
<code_context>
            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket(wss)。

*来源: opengrep*
</issue_to_address>

### Comment 3
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="72" />
<code_context>
            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket(wss)。

*来源: opengrep*
</issue_to_address>

### Comment 4
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对子进程函数 `run` 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 `shlex.escape()`*来源: opengrep*
</issue_to_address>

Sourcery 对开源项目是免费的 —— 如果你觉得我们的评审有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English

Hey - I've found 4 security issues, and left some high level feedback:

Security issues:

  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)

General comments:

  • In SQLiteDatabase.get_filtered_conversations, data_sql concatenates "FROM webchat_conversation" directly with where_sql (which no longer includes a leading space), producing FROM webchat_conversationWHERE ...; add an explicit space before WHERE (e.g., "FROM webchat_conversation " + where_sql) to avoid generating invalid SQL.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `SQLiteDatabase.get_filtered_conversations`, `data_sql` concatenates `"FROM webchat_conversation"` directly with `where_sql` (which no longer includes a leading space), producing `FROM webchat_conversationWHERE ...`; add an explicit space before `WHERE` (e.g., `"FROM webchat_conversation " + where_sql`) to avoid generating invalid SQL.

## Individual Comments

### Comment 1
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="53" />
<code_context>
            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 3
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="72" />
<code_context>
            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 4
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 114 to 120
result = subprocess.run(
[os.environ.get("PYTHON", sys.executable), "-c", code],
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 shlex.escape()

来源: opengrep

Original comment in English

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

print(f"Uploading to: {target}")

subprocess.run(cmd, check=True)
subprocess.run(cmd, check=True, shell=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 shlex.escape()

来源: opengrep

Original comment in English

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

Copy link

@SourceryAI SourceryAI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 4 个安全问题、4 个其他问题,并给出了一些高层次反馈:

Security issues:

  • 检测到对 subprocess 函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()。(link)
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。(link)
  • 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。(link)
  • 检测到对 subprocess 函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()。(link)

General comments:

  • 在 Kubernetes 部署清单中,主镜像(mlikiowa/napcat-docker:latestsoulter/astrbot:latest)仍然使用 latest 标签;可以考虑像你在其他地方那样,为了实现加固和可复现部署的目标,将它们固定到明确的版本号。
面向 AI Agent 的提示
Please address the comments from this code review:

## Overall Comments
- 在 Kubernetes 部署清单中,主镜像(`mlikiowa/napcat-docker:latest``soulter/astrbot:latest`)仍然使用 `latest` 标签;可以考虑像你在其他地方那样,为了实现加固和可复现部署的目标,将它们固定到明确的版本号。

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="46-55" />
<code_context>
+    return parsed
+
+
+def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str:
+    normalized_url = url.rstrip("/")
+    parsed = urlsplit(normalized_url)
+    allowed_schemes = {"http", "https", "ws", "wss"}
+
+    if parsed.scheme not in allowed_schemes:
+        raise ValueError(
+            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
+        )
+
+    parsed = require_secure_transport_url(
+        normalized_url,
+        label=label,
+        allowed_schemes=allowed_schemes,
+    )
+    scheme_map = {
+        "http": "ws",
+        "https": "wss",
+        "ws": "ws",
+        "wss": "wss",
+    }
+
+    try:
+        ws_scheme = scheme_map[parsed.scheme]
+    except KeyError as exc:
+        raise ValueError(
+            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
+        ) from exc
+
+    return urlunsplit(
+        parsed._replace(scheme=ws_scheme),
+    )
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 避免从 WebSocket URL 中去除所有尾部斜杠,以保留路径语义。

`to_websocket_url` 当前使用 `url.rstrip("/")`,这会去除*所有*尾部斜杠,从而改变某些路径的含义(例如 `https://example.com/app/``https://example.com/app`)。如果目标只是去掉用户多输入的单个尾部斜杠,可以考虑使用更有针对性的方法:

```python
normalized_url = url[:-1] if url.endswith("/") else url
```

或者完全跳过这一步「规范化」,直接依赖 `urlsplit`/`urlunsplit`,将任何路径规范化的责任留给调用方。这样可以避免在尾部斜杠有语义意义的端点上篡改路径。
</issue_to_address>

### Comment 2
<location path="tests/unit/test_websocket_security.py" line_range="12-19" />
<code_context>
+)
+
+
+def test_require_secure_transport_url_allows_local_ws() -> None:
+    parsed = require_secure_transport_url(
+        "ws://localhost:5140/satori/v1/events",
+        label="Satori WebSocket URL",
+        allowed_schemes={"ws", "wss"},
+    )
+
+    assert parsed.scheme == "ws"
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** 在使用不安全 ws:// 时考虑为私有 IP 和本地域名添加测试覆盖

当前已经覆盖了 `ws://localhost`。由于 `_is_local_or_private_host` 也接受私有/环回 IP 和内部域名后缀,请添加诸如 `ws://127.0.0.1/...``ws://10.0.0.5/...``ws://service.internal/...` 等案例,以锁定预期的允许列表,并防止主机分类逻辑回归。
</issue_to_address>

### Comment 3
<location path="tests/unit/test_sqlite_v3.py" line_range="44-53" />
<code_context>
+    def test_get_filtered_conversations_uses_named_parameters(self):
</code_context>
<issue_to_address>
**suggestion (testing):** 考虑为无过滤条件的查询以及 `get_filtered_conversations` 中包含多个列表项的情况添加测试

为了完善覆盖率:

- 增加一个所有过滤参数都为空/None 的用例,并断言 `count_sql`/`data_sql` 中不存在 `WHERE` 子句,同时参数仅包含 `page_size``offset`- 增加一个包含多个 `platforms` 和/或 `message_types` 条目的用例,以验证会生成不同的参数名(例如 `:platform_0``:platform_1`),并且这些参数在 SQL 中被正确引用。

这将更好地防止查询构建逻辑的回归。

建议实现:

```python
    def test_get_filtered_conversations_uses_named_parameters(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=2,
            page_size=10,
            platforms=["qq"],
            message_types=["group"],
            search_query="x' OR 1=1 --",
            exclude_ids=["admin"],
            exclude_platforms=["slack"],
        )

        # Ensure two queries (count + data) are executed with named parameters
        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        # Basic contract: queries use named parameters and do not interpolate the search_query
        assert ":" in count_sql
        assert ":" in data_sql

        # Dangerous search_query should not appear directly in the SQL but only in bound params
        assert "x' OR 1=1 --" not in count_sql
        assert "x' OR 1=1 --" not in data_sql
        assert any(v == "x' OR 1=1 --" for v in count_params.values())
        assert any(v == "x' OR 1=1 --" for v in data_params.values())

    def test_get_filtered_conversations_without_filters_has_no_where_clause(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=1,
            page_size=25,
            platforms=None,
            message_types=None,
            search_query=None,
            exclude_ids=None,
            exclude_platforms=None,
        )

        # We should still execute the standard count + data queries
        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        normalized_count_sql = " ".join(count_sql.split()).upper()
        normalized_data_sql = " ".join(data_sql.split()).upper()

        # No WHERE clause should be present when no filters are provided
        assert " WHERE " not in normalized_count_sql
        assert " WHERE " not in normalized_data_sql

        # Params for the data query should only include pagination
        assert "page_size" in data_params
        assert "offset" in data_params
        assert len(data_params) == 2

        # Count query may or may not use page_size; it must not include filter params
        forbidden_prefixes = ("platform_", "message_type_", "exclude_id_", "exclude_platform_", "search_")
        assert not any(k.startswith(forbidden_prefixes) for k in count_params.keys())

    def test_get_filtered_conversations_with_multiple_list_entries_uses_distinct_params(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=1,
            page_size=10,
            platforms=["qq", "slack"],
            message_types=["group", "private"],
            search_query=None,
            exclude_ids=None,
            exclude_platforms=None,
        )

        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        # Ensure multiple distinct parameter names are generated for list filters
        platform_param_keys = {k for k in data_params.keys() if k.startswith("platform_")}
        message_type_param_keys = {k for k in data_params.keys() if k.startswith("message_type_")}

        # At least two distinct params for each list
        assert len(platform_param_keys) >= 2
        assert len(message_type_param_keys) >= 2

        # All generated parameter names should appear in the SQL
        for key in platform_param_keys | message_type_param_keys:
            assert f":{key}" in data_sql

        # And none of them are accidentally reused in a conflicting way
        assert platform_param_keys.isdisjoint(message_type_param_keys)

```

这些测试假设:

1. `get_filtered_conversations` 通过 `cursor.execute` 恰好执行两条查询:先是计数查询,然后是数据查询。
2. 唯一的非过滤参数是 `page_size``offset`3. 列表过滤参数的名称前缀分别为 `platform_``message_type_`。

如果你的实现不同,请相应调整:
- `get_filtered_conversations` 调用中的参数列表(按需增加/删除关键字参数)。
- 针对 `count_params`/`data_params` 键的断言,以及参数名前缀(`platform_``message_type_`)以匹配你查询构造器的命名方案。
- 如果方法发出的查询方式不同,则调整对 `cursor.execute.call_count` 以及调用顺序的预期。
</issue_to_address>

### Comment 4
<location path="tests/unit/test_t2i_security.py" line_range="31-33" />
<code_context>
+    assert template_paths
+
+    for template_path in template_paths:
+        content = template_path.read_text(encoding="utf-8")
+        assert "text | safe" not in content
+        assert "text | tojson" in content
</code_context>
<issue_to_address>
**suggestion (testing):** 让模板安全性断言更具体地针对文本占位符,以避免误报

因为此检查只搜索子串 `"text | tojson"`,如果该文本出现在注释或非用户可控的值上,也有可能通过测试。为了更好地体现测试意图,可以改为匹配特定的模板占位符(例如通过正则匹配 `{{\s*text\s*\|\s*tojson\s*}}`),从而确保测试验证的是用户提供的 `text` 在每个模板中都经过安全序列化。

建议实现:

```python
import re


def test_t2i_templates_use_json_serialization_for_text():

```

```python
        content = template_path.read_text(encoding="utf-8")
        assert "text | safe" not in content
        assert re.search(r"{{\s*text\s*\|\s*tojson\s*}}", content) is not None

```
</issue_to_address>

### Comment 5
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对 subprocess 函数 `run` 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 `shlex.escape()`*Source: opengrep*
</issue_to_address>

### Comment 6
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="53" />
<code_context>
            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。

*Source: opengrep*
</issue_to_address>

### Comment 7
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="72" />
<code_context>
            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。

*Source: opengrep*
</issue_to_address>

### Comment 8
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对 subprocess 函数 `run` 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 `shlex.escape()`*Source: opengrep*
</issue_to_address>

Hi @zouyonghe! 👋

感谢你通过评论 @sourcery-ai review 来试用 Sourcery!🚀

安装 sourcery-ai bot,即可在每个 pull request 上自动获取代码审查 ✨

帮我变得更有用!请在每条评论上点击 👍 或 👎,我会利用你的反馈来改进后续的审查。
Original comment in English

Hey - I've found 4 security issues, 4 other issues, and left some high level feedback:

Security issues:

  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)

General comments:

  • In the Kubernetes deployment manifests, the main images (mlikiowa/napcat-docker:latest and soulter/astrbot:latest) are still using the latest tag; consider pinning them to explicit versions to match the hardened, reproducible deployment goal you applied elsewhere.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In the Kubernetes deployment manifests, the main images (`mlikiowa/napcat-docker:latest` and `soulter/astrbot:latest`) are still using the `latest` tag; consider pinning them to explicit versions to match the hardened, reproducible deployment goal you applied elsewhere.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="46-55" />
<code_context>
+    return parsed
+
+
+def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str:
+    normalized_url = url.rstrip("/")
+    parsed = urlsplit(normalized_url)
+    allowed_schemes = {"http", "https", "ws", "wss"}
+
+    if parsed.scheme not in allowed_schemes:
+        raise ValueError(
+            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
+        )
+
+    parsed = require_secure_transport_url(
+        normalized_url,
+        label=label,
+        allowed_schemes=allowed_schemes,
+    )
+    scheme_map = {
+        "http": "ws",
+        "https": "wss",
+        "ws": "ws",
+        "wss": "wss",
+    }
+
+    try:
+        ws_scheme = scheme_map[parsed.scheme]
+    except KeyError as exc:
+        raise ValueError(
+            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
+        ) from exc
+
+    return urlunsplit(
+        parsed._replace(scheme=ws_scheme),
+    )
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Avoid stripping all trailing slashes from WebSocket URLs to preserve path semantics.

`to_websocket_url` currently uses `url.rstrip("/")`, which strips *all* trailing slashes and can change the meaning of some paths (e.g. `https://example.com/app/` vs `https://example.com/app`). If the goal is only to trim a single user-added slash, consider a more targeted approach:

```python
normalized_url = url[:-1] if url.endswith("/") else url
```

or skip this normalization entirely and rely on `urlsplit`/`urlunsplit`, leaving any path normalization to callers. This avoids altering endpoints where a trailing slash is significant.
</issue_to_address>

### Comment 2
<location path="tests/unit/test_websocket_security.py" line_range="12-19" />
<code_context>
+)
+
+
+def test_require_secure_transport_url_allows_local_ws() -> None:
+    parsed = require_secure_transport_url(
+        "ws://localhost:5140/satori/v1/events",
+        label="Satori WebSocket URL",
+        allowed_schemes={"ws", "wss"},
+    )
+
+    assert parsed.scheme == "ws"
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding coverage for private IPs and local hostnames when using insecure ws://

This already covers `ws://localhost`. Since `_is_local_or_private_host` also accepts private/loopback IPs and internal suffixes, please add cases like `ws://127.0.0.1/...`, `ws://10.0.0.5/...`, and `ws://service.internal/...` to lock in the intended allowlist and prevent regressions in host classification.
</issue_to_address>

### Comment 3
<location path="tests/unit/test_sqlite_v3.py" line_range="44-53" />
<code_context>
+    def test_get_filtered_conversations_uses_named_parameters(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for no-filter queries and multiple list entries in get_filtered_conversations

To round out coverage:

- Add a case where all filter arguments are empty/None and assert that `count_sql`/`data_sql` have no `WHERE` clause and the params only include `page_size` and `offset`.
- Add a case with multiple `platforms` and/or `message_types` entries to verify distinct parameter names (e.g. `:platform_0`, `:platform_1`) are generated and correctly referenced in the SQL.

This will better protect against regressions in the query-building logic.

Suggested implementation:

```python
    def test_get_filtered_conversations_uses_named_parameters(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=2,
            page_size=10,
            platforms=["qq"],
            message_types=["group"],
            search_query="x' OR 1=1 --",
            exclude_ids=["admin"],
            exclude_platforms=["slack"],
        )

        # Ensure two queries (count + data) are executed with named parameters
        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        # Basic contract: queries use named parameters and do not interpolate the search_query
        assert ":" in count_sql
        assert ":" in data_sql

        # Dangerous search_query should not appear directly in the SQL but only in bound params
        assert "x' OR 1=1 --" not in count_sql
        assert "x' OR 1=1 --" not in data_sql
        assert any(v == "x' OR 1=1 --" for v in count_params.values())
        assert any(v == "x' OR 1=1 --" for v in data_params.values())

    def test_get_filtered_conversations_without_filters_has_no_where_clause(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=1,
            page_size=25,
            platforms=None,
            message_types=None,
            search_query=None,
            exclude_ids=None,
            exclude_platforms=None,
        )

        # We should still execute the standard count + data queries
        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        normalized_count_sql = " ".join(count_sql.split()).upper()
        normalized_data_sql = " ".join(data_sql.split()).upper()

        # No WHERE clause should be present when no filters are provided
        assert " WHERE " not in normalized_count_sql
        assert " WHERE " not in normalized_data_sql

        # Params for the data query should only include pagination
        assert "page_size" in data_params
        assert "offset" in data_params
        assert len(data_params) == 2

        # Count query may or may not use page_size; it must not include filter params
        forbidden_prefixes = ("platform_", "message_type_", "exclude_id_", "exclude_platform_", "search_")
        assert not any(k.startswith(forbidden_prefixes) for k in count_params.keys())

    def test_get_filtered_conversations_with_multiple_list_entries_uses_distinct_params(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=1,
            page_size=10,
            platforms=["qq", "slack"],
            message_types=["group", "private"],
            search_query=None,
            exclude_ids=None,
            exclude_platforms=None,
        )

        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        # Ensure multiple distinct parameter names are generated for list filters
        platform_param_keys = {k for k in data_params.keys() if k.startswith("platform_")}
        message_type_param_keys = {k for k in data_params.keys() if k.startswith("message_type_")}

        # At least two distinct params for each list
        assert len(platform_param_keys) >= 2
        assert len(message_type_param_keys) >= 2

        # All generated parameter names should appear in the SQL
        for key in platform_param_keys | message_type_param_keys:
            assert f":{key}" in data_sql

        # And none of them are accidentally reused in a conflicting way
        assert platform_param_keys.isdisjoint(message_type_param_keys)

```

These tests assume:

1. `get_filtered_conversations` executes exactly two queries via `cursor.execute`: a count query first, then a data query.
2. The only non-filter parameters are `page_size` and `offset`.
3. Parameter names for list filters are prefixed as `platform_` and `message_type_`.

If your implementation differs, adjust:
- The argument list in the `get_filtered_conversations` calls (add/remove keyword arguments as needed).
- The assertions on `count_params`/`data_params` keys and the param-name prefixes (`platform_`, `message_type_`) to match the actual naming scheme used by your query builder.
- The expectations on `cursor.execute.call_count` and call order if the method issues queries differently.
</issue_to_address>

### Comment 4
<location path="tests/unit/test_t2i_security.py" line_range="31-33" />
<code_context>
+    assert template_paths
+
+    for template_path in template_paths:
+        content = template_path.read_text(encoding="utf-8")
+        assert "text | safe" not in content
+        assert "text | tojson" in content
</code_context>
<issue_to_address>
**suggestion (testing):** Make the template safety assertion more specific to the text placeholder to avoid false positives

Because this check only looks for the substring `"text | tojson"`, it could pass if that text appears in a comment or for a non-user-controlled value. To better enforce the intent, match the specific template placeholder instead (e.g., via regex for `{{\s*text\s*\|\s*tojson\s*}}`) so the test confirms the user-supplied `text` is safely serialized in each template.

Suggested implementation:

```python
import re


def test_t2i_templates_use_json_serialization_for_text():

```

```python
        content = template_path.read_text(encoding="utf-8")
        assert "text | safe" not in content
        assert re.search(r"{{\s*text\s*\|\s*tojson\s*}}", content) is not None

```
</issue_to_address>

### Comment 5
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 6
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="53" />
<code_context>
            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 7
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="72" />
<code_context>
            f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

*Source: opengrep*
</issue_to_address>

### Comment 8
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Hi @zouyonghe! 👋

Thanks for trying out Sourcery by commenting with @sourcery-ai review! 🚀

Install the sourcery-ai bot to get automatic code reviews on every pull request ✨

Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +46 to +55
def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str:
normalized_url = url.rstrip("/")
parsed = urlsplit(normalized_url)
allowed_schemes = {"http", "https", "ws", "wss"}

if parsed.scheme not in allowed_schemes:
raise ValueError(
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 避免从 WebSocket URL 中去除所有尾部斜杠,以保留路径语义。

to_websocket_url 当前使用 url.rstrip("/"),这会去除所有尾部斜杠,从而改变某些路径的含义(例如 https://example.com/app/https://example.com/app)。如果目标只是去掉用户多输入的单个尾部斜杠,可以考虑使用更有针对性的方法:

normalized_url = url[:-1] if url.endswith("/") else url

或者完全跳过这一步「规范化」,直接依赖 urlsplit/urlunsplit,将任何路径规范化的责任留给调用方。这样可以避免在尾部斜杠有语义意义的端点上篡改路径。

Original comment in English

suggestion (bug_risk): Avoid stripping all trailing slashes from WebSocket URLs to preserve path semantics.

to_websocket_url currently uses url.rstrip("/"), which strips all trailing slashes and can change the meaning of some paths (e.g. https://example.com/app/ vs https://example.com/app). If the goal is only to trim a single user-added slash, consider a more targeted approach:

normalized_url = url[:-1] if url.endswith("/") else url

or skip this normalization entirely and rely on urlsplit/urlunsplit, leaving any path normalization to callers. This avoids altering endpoints where a trailing slash is significant.

Comment on lines +12 to +19
def test_require_secure_transport_url_allows_local_ws() -> None:
parsed = require_secure_transport_url(
"ws://localhost:5140/satori/v1/events",
label="Satori WebSocket URL",
allowed_schemes={"ws", "wss"},
)

assert parsed.scheme == "ws"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 在使用不安全 ws:// 时考虑为私有 IP 和本地域名添加测试覆盖

当前已经覆盖了 ws://localhost。由于 _is_local_or_private_host 也接受私有/环回 IP 和内部域名后缀,请添加诸如 ws://127.0.0.1/...ws://10.0.0.5/...ws://service.internal/... 等案例,以锁定预期的允许列表,并防止主机分类逻辑回归。

Original comment in English

suggestion (testing): Consider adding coverage for private IPs and local hostnames when using insecure ws://

This already covers ws://localhost. Since _is_local_or_private_host also accepts private/loopback IPs and internal suffixes, please add cases like ws://127.0.0.1/..., ws://10.0.0.5/..., and ws://service.internal/... to lock in the intended allowlist and prevent regressions in host classification.

Comment on lines +44 to +53
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()

db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query="x' OR 1=1 --",
exclude_ids=["admin"],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 考虑为无过滤条件的查询以及 get_filtered_conversations 中包含多个列表项的情况添加测试

为了完善覆盖率:

  • 增加一个所有过滤参数都为空/None 的用例,并断言 count_sql/data_sql 中不存在 WHERE 子句,同时参数仅包含 page_sizeoffset
  • 增加一个包含多个 platforms 和/或 message_types 条目的用例,以验证会生成不同的参数名(例如 :platform_0:platform_1),并且这些参数在 SQL 中被正确引用。

这将更好地防止查询构建逻辑的回归。

建议实现:

    def test_get_filtered_conversations_uses_named_parameters(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=2,
            page_size=10,
            platforms=["qq"],
            message_types=["group"],
            search_query="x' OR 1=1 --",
            exclude_ids=["admin"],
            exclude_platforms=["slack"],
        )

        # Ensure two queries (count + data) are executed with named parameters
        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        # Basic contract: queries use named parameters and do not interpolate the search_query
        assert ":" in count_sql
        assert ":" in data_sql

        # Dangerous search_query should not appear directly in the SQL but only in bound params
        assert "x' OR 1=1 --" not in count_sql
        assert "x' OR 1=1 --" not in data_sql
        assert any(v == "x' OR 1=1 --" for v in count_params.values())
        assert any(v == "x' OR 1=1 --" for v in data_params.values())

    def test_get_filtered_conversations_without_filters_has_no_where_clause(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=1,
            page_size=25,
            platforms=None,
            message_types=None,
            search_query=None,
            exclude_ids=None,
            exclude_platforms=None,
        )

        # We should still execute the standard count + data queries
        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        normalized_count_sql = " ".join(count_sql.split()).upper()
        normalized_data_sql = " ".join(data_sql.split()).upper()

        # No WHERE clause should be present when no filters are provided
        assert " WHERE " not in normalized_count_sql
        assert " WHERE " not in normalized_data_sql

        # Params for the data query should only include pagination
        assert "page_size" in data_params
        assert "offset" in data_params
        assert len(data_params) == 2

        # Count query may or may not use page_size; it must not include filter params
        forbidden_prefixes = ("platform_", "message_type_", "exclude_id_", "exclude_platform_", "search_")
        assert not any(k.startswith(forbidden_prefixes) for k in count_params.keys())

    def test_get_filtered_conversations_with_multiple_list_entries_uses_distinct_params(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=1,
            page_size=10,
            platforms=["qq", "slack"],
            message_types=["group", "private"],
            search_query=None,
            exclude_ids=None,
            exclude_platforms=None,
        )

        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        # Ensure multiple distinct parameter names are generated for list filters
        platform_param_keys = {k for k in data_params.keys() if k.startswith("platform_")}
        message_type_param_keys = {k for k in data_params.keys() if k.startswith("message_type_")}

        # At least two distinct params for each list
        assert len(platform_param_keys) >= 2
        assert len(message_type_param_keys) >= 2

        # All generated parameter names should appear in the SQL
        for key in platform_param_keys | message_type_param_keys:
            assert f":{key}" in data_sql

        # And none of them are accidentally reused in a conflicting way
        assert platform_param_keys.isdisjoint(message_type_param_keys)

这些测试假设:

  1. get_filtered_conversations 通过 cursor.execute 恰好执行两条查询:先是计数查询,然后是数据查询。
  2. 唯一的非过滤参数是 page_sizeoffset
  3. 列表过滤参数的名称前缀分别为 platform_message_type_

如果你的实现不同,请相应调整:

  • get_filtered_conversations 调用中的参数列表(按需增加/删除关键字参数)。
  • 针对 count_params/data_params 键的断言,以及参数名前缀(platform_message_type_)以匹配你查询构造器的命名方案。
  • 如果方法发出的查询方式不同,则调整对 cursor.execute.call_count 以及调用顺序的预期。
Original comment in English

suggestion (testing): Consider adding tests for no-filter queries and multiple list entries in get_filtered_conversations

To round out coverage:

  • Add a case where all filter arguments are empty/None and assert that count_sql/data_sql have no WHERE clause and the params only include page_size and offset.
  • Add a case with multiple platforms and/or message_types entries to verify distinct parameter names (e.g. :platform_0, :platform_1) are generated and correctly referenced in the SQL.

This will better protect against regressions in the query-building logic.

Suggested implementation:

    def test_get_filtered_conversations_uses_named_parameters(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=2,
            page_size=10,
            platforms=["qq"],
            message_types=["group"],
            search_query="x' OR 1=1 --",
            exclude_ids=["admin"],
            exclude_platforms=["slack"],
        )

        # Ensure two queries (count + data) are executed with named parameters
        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        # Basic contract: queries use named parameters and do not interpolate the search_query
        assert ":" in count_sql
        assert ":" in data_sql

        # Dangerous search_query should not appear directly in the SQL but only in bound params
        assert "x' OR 1=1 --" not in count_sql
        assert "x' OR 1=1 --" not in data_sql
        assert any(v == "x' OR 1=1 --" for v in count_params.values())
        assert any(v == "x' OR 1=1 --" for v in data_params.values())

    def test_get_filtered_conversations_without_filters_has_no_where_clause(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=1,
            page_size=25,
            platforms=None,
            message_types=None,
            search_query=None,
            exclude_ids=None,
            exclude_platforms=None,
        )

        # We should still execute the standard count + data queries
        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        normalized_count_sql = " ".join(count_sql.split()).upper()
        normalized_data_sql = " ".join(data_sql.split()).upper()

        # No WHERE clause should be present when no filters are provided
        assert " WHERE " not in normalized_count_sql
        assert " WHERE " not in normalized_data_sql

        # Params for the data query should only include pagination
        assert "page_size" in data_params
        assert "offset" in data_params
        assert len(data_params) == 2

        # Count query may or may not use page_size; it must not include filter params
        forbidden_prefixes = ("platform_", "message_type_", "exclude_id_", "exclude_platform_", "search_")
        assert not any(k.startswith(forbidden_prefixes) for k in count_params.keys())

    def test_get_filtered_conversations_with_multiple_list_entries_uses_distinct_params(self):
        db, cursor = self._create_db_with_mock_cursor()

        db.get_filtered_conversations(
            page=1,
            page_size=10,
            platforms=["qq", "slack"],
            message_types=["group", "private"],
            search_query=None,
            exclude_ids=None,
            exclude_platforms=None,
        )

        assert cursor.execute.call_count == 2
        count_sql, count_params = cursor.execute.call_args_list[0].args
        data_sql, data_params = cursor.execute.call_args_list[1].args

        # Ensure multiple distinct parameter names are generated for list filters
        platform_param_keys = {k for k in data_params.keys() if k.startswith("platform_")}
        message_type_param_keys = {k for k in data_params.keys() if k.startswith("message_type_")}

        # At least two distinct params for each list
        assert len(platform_param_keys) >= 2
        assert len(message_type_param_keys) >= 2

        # All generated parameter names should appear in the SQL
        for key in platform_param_keys | message_type_param_keys:
            assert f":{key}" in data_sql

        # And none of them are accidentally reused in a conflicting way
        assert platform_param_keys.isdisjoint(message_type_param_keys)

These tests assume:

  1. get_filtered_conversations executes exactly two queries via cursor.execute: a count query first, then a data query.
  2. The only non-filter parameters are page_size and offset.
  3. Parameter names for list filters are prefixed as platform_ and message_type_.

If your implementation differs, adjust:

  • The argument list in the get_filtered_conversations calls (add/remove keyword arguments as needed).
  • The assertions on count_params/data_params keys and the param-name prefixes (platform_, message_type_) to match the actual naming scheme used by your query builder.
  • The expectations on cursor.execute.call_count and call order if the method issues queries differently.

Comment on lines +31 to +33
content = template_path.read_text(encoding="utf-8")
assert "text | safe" not in content
assert "text | tojson" in content

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 让模板安全性断言更具体地针对文本占位符,以避免误报

因为此检查只搜索子串 "text | tojson",如果该文本出现在注释或非用户可控的值上,也有可能通过测试。为了更好地体现测试意图,可以改为匹配特定的模板占位符(例如通过正则匹配 {{\s*text\s*\|\s*tojson\s*}}),从而确保测试验证的是用户提供的 text 在每个模板中都经过安全序列化。

建议实现:

import re


def test_t2i_templates_use_json_serialization_for_text():
        content = template_path.read_text(encoding="utf-8")
        assert "text | safe" not in content
        assert re.search(r"{{\s*text\s*\|\s*tojson\s*}}", content) is not None
Original comment in English

suggestion (testing): Make the template safety assertion more specific to the text placeholder to avoid false positives

Because this check only looks for the substring "text | tojson", it could pass if that text appears in a comment or for a non-user-controlled value. To better enforce the intent, match the specific template placeholder instead (e.g., via regex for {{\s*text\s*\|\s*tojson\s*}}) so the test confirms the user-supplied text is safely serialized in each template.

Suggested implementation:

import re


def test_t2i_templates_use_json_serialization_for_text():
        content = template_path.read_text(encoding="utf-8")
        assert "text | safe" not in content
        assert re.search(r"{{\s*text\s*\|\s*tojson\s*}}", content) is not None

Comment on lines 114 to 120
result = subprocess.run(
[os.environ.get("PYTHON", sys.executable), "-c", code],
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对 subprocess 函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()

Source: opengrep

Original comment in English

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep


if parsed.scheme not in allowed_schemes:
raise ValueError(
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (javascript.lang.security.detect-insecure-websocket): 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。

Source: opengrep

Original comment in English

security (javascript.lang.security.detect-insecure-websocket): Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

Source: opengrep

ws_scheme = scheme_map[parsed.scheme]
except KeyError as exc:
raise ValueError(
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (javascript.lang.security.detect-insecure-websocket): 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。

Source: opengrep

Original comment in English

security (javascript.lang.security.detect-insecure-websocket): Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.

Source: opengrep

print(f"Uploading to: {target}")

subprocess.run(cmd, check=True)
subprocess.run(cmd, check=True, shell=False)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对 subprocess 函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()

Source: opengrep

Original comment in English

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

@zouyonghe
Copy link
Member Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 2 个安全问题、2 个其他问题,并留下了一些整体反馈:

安全问题

  • 检测到在使用子进程函数 run 时没有使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用,确保其不能被外部资源控制。你可以考虑使用 shlex.escape()。(链接
  • 检测到在使用子进程函数 run 时没有使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用,确保其不能被外部资源控制。你可以考虑使用 shlex.escape()。(链接

总体评论

  • SQLiteDatabase.get_filtered_conversations 中,当前 data_sql 的字符串拼接会生成 FROM webchat_conversationWHERE ...(在 WHERE 之前缺少一个空格),这会在运行时导致查询失败;请在 FROM webchat_conversation 与插值的 where_sql 之间显式添加一个空格或换行。
提供给 AI Agents 的提示词
Please address the comments from this code review:

## Overall Comments
- In `SQLiteDatabase.get_filtered_conversations`, the `data_sql` string concatenation currently produces `FROM webchat_conversationWHERE ...` (missing a space before `WHERE`), which will break the query at runtime; add an explicit space or newline between `FROM webchat_conversation` and the interpolated `where_sql`.

## Individual Comments

### Comment 1
<location path=".github/workflows/build-docs.yml" line_range="14" />
<code_context>
     steps:
       - name: checkout
-        uses: actions/checkout@master
+        uses: actions/checkout@v6
       - name: nodejs installation
         uses: actions/setup-node@v6
</code_context>
<issue_to_address>
**issue (bug_risk):** The `actions/checkout@v6` tag does not currently exist and will cause workflow failures.

`actions/checkout` is only available up to `v4`. Please update this to `actions/checkout@v4` (or a specific `v4.x.y`), consistent with how the other actions are pinned, to avoid runtime resolution errors.
</issue_to_address>

### Comment 2
<location path="tests/unit/test_sqlite_v3.py" line_range="44-53" />
<code_context>
+    def test_get_filtered_conversations_uses_named_parameters(self):
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Strengthen assertions to ensure raw search input never appears in the data query SQL

This already confirms the unsafe input only appears in the bound parameters for the count query. Please add an equivalent assertion for `data_sql` (e.g. `assert "x' OR 1=1 --" not in data_sql`) so both queries are covered for SQL injection hardening.

Suggested implementation:

```python
    def test_get_filtered_conversations_uses_named_parameters(self):
        db, cursor = self._create_db_with_mock_cursor()

        unsafe_search = "x' OR 1=1 --"

        db.get_filtered_conversations(
            page=2,
            page_size=10,
            platforms=["qq"],
            message_types=["group"],
            search_query=unsafe_search,
            exclude_ids=["admin"],
            exclude_platforms=["slack"],
        )

        # Expect two queries: a count(*) query and a data query
        assert cursor.execute.call_count == 2

        (count_sql, count_params), (data_sql, data_params) = [
            call.args for call in cursor.execute.call_args_list
        ]

        # Unsafe input must only appear in bound parameters for both queries,
        # never interpolated into the raw SQL strings.
        assert unsafe_search in count_params.values()
        assert unsafe_search not in count_sql

        assert unsafe_search in data_params.values()
        assert unsafe_search not in data_sql

```

If the existing test already unpacks `cursor.execute.call_args_list` or asserts on `count_sql`/`count_params`, you may want to merge those existing assertions into this rewritten body instead of replacing them entirely. The key requirement is that both queries (count and data) assert that the raw SQL (`*_sql`) does not contain the unsafe search string, while the parameters (`*_params`) do contain it.
</issue_to_address>

### Comment 3
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 4
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Sourcery 对开源项目免费——如果你觉得我们的 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据这些反馈改进以后的 Review。
Original comment in English

Hey - I've found 2 security issues, 2 other issues, and left some high level feedback:

Security issues:

  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)

General comments:

  • In SQLiteDatabase.get_filtered_conversations, the data_sql string concatenation currently produces FROM webchat_conversationWHERE ... (missing a space before WHERE), which will break the query at runtime; add an explicit space or newline between FROM webchat_conversation and the interpolated where_sql.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `SQLiteDatabase.get_filtered_conversations`, the `data_sql` string concatenation currently produces `FROM webchat_conversationWHERE ...` (missing a space before `WHERE`), which will break the query at runtime; add an explicit space or newline between `FROM webchat_conversation` and the interpolated `where_sql`.

## Individual Comments

### Comment 1
<location path=".github/workflows/build-docs.yml" line_range="14" />
<code_context>
     steps:
       - name: checkout
-        uses: actions/checkout@master
+        uses: actions/checkout@v6
       - name: nodejs installation
         uses: actions/setup-node@v6
</code_context>
<issue_to_address>
**issue (bug_risk):** The `actions/checkout@v6` tag does not currently exist and will cause workflow failures.

`actions/checkout` is only available up to `v4`. Please update this to `actions/checkout@v4` (or a specific `v4.x.y`), consistent with how the other actions are pinned, to avoid runtime resolution errors.
</issue_to_address>

### Comment 2
<location path="tests/unit/test_sqlite_v3.py" line_range="44-53" />
<code_context>
+    def test_get_filtered_conversations_uses_named_parameters(self):
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Strengthen assertions to ensure raw search input never appears in the data query SQL

This already confirms the unsafe input only appears in the bound parameters for the count query. Please add an equivalent assertion for `data_sql` (e.g. `assert "x' OR 1=1 --" not in data_sql`) so both queries are covered for SQL injection hardening.

Suggested implementation:

```python
    def test_get_filtered_conversations_uses_named_parameters(self):
        db, cursor = self._create_db_with_mock_cursor()

        unsafe_search = "x' OR 1=1 --"

        db.get_filtered_conversations(
            page=2,
            page_size=10,
            platforms=["qq"],
            message_types=["group"],
            search_query=unsafe_search,
            exclude_ids=["admin"],
            exclude_platforms=["slack"],
        )

        # Expect two queries: a count(*) query and a data query
        assert cursor.execute.call_count == 2

        (count_sql, count_params), (data_sql, data_params) = [
            call.args for call in cursor.execute.call_args_list
        ]

        # Unsafe input must only appear in bound parameters for both queries,
        # never interpolated into the raw SQL strings.
        assert unsafe_search in count_params.values()
        assert unsafe_search not in count_sql

        assert unsafe_search in data_params.values()
        assert unsafe_search not in data_sql

```

If the existing test already unpacks `cursor.execute.call_args_list` or asserts on `count_sql`/`count_params`, you may want to merge those existing assertions into this rewritten body instead of replacing them entirely. The key requirement is that both queries (count and data) assert that the raw SQL (`*_sql`) does not contain the unsafe search string, while the parameters (`*_params`) do contain it.
</issue_to_address>

### Comment 3
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 4
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

steps:
- name: checkout
uses: actions/checkout@master
uses: actions/checkout@v6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): actions/checkout@v6 这个标签目前并不存在,会导致工作流失败。

actions/checkout 目前最高只到 v4。请将其更新为 actions/checkout@v4(或具体的 v4.x.y 版本),并与其他已固定版本的 actions 保持一致,以避免运行时解析错误。

Original comment in English

issue (bug_risk): The actions/checkout@v6 tag does not currently exist and will cause workflow failures.

actions/checkout is only available up to v4. Please update this to actions/checkout@v4 (or a specific v4.x.y), consistent with how the other actions are pinned, to avoid runtime resolution errors.

Comment on lines +44 to +53
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()

db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query="x' OR 1=1 --",
exclude_ids=["admin"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 suggestion (security): 加强断言,确保原始搜索输入永远不会出现在数据查询的 SQL 中。

目前已经确认了不安全输入只会出现在计数查询(count query)的绑定参数中。请为 data_sql 添加等效的断言(例如 assert "x' OR 1=1 --" not in data_sql),从而让这两个查询都覆盖到 SQL 注入加固场景。

建议实现:

    def test_get_filtered_conversations_uses_named_parameters(self):
        db, cursor = self._create_db_with_mock_cursor()

        unsafe_search = "x' OR 1=1 --"

        db.get_filtered_conversations(
            page=2,
            page_size=10,
            platforms=["qq"],
            message_types=["group"],
            search_query=unsafe_search,
            exclude_ids=["admin"],
            exclude_platforms=["slack"],
        )

        # Expect two queries: a count(*) query and a data query
        assert cursor.execute.call_count == 2

        (count_sql, count_params), (data_sql, data_params) = [
            call.args for call in cursor.execute.call_args_list
        ]

        # Unsafe input must only appear in bound parameters for both queries,
        # never interpolated into the raw SQL strings.
        assert unsafe_search in count_params.values()
        assert unsafe_search not in count_sql

        assert unsafe_search in data_params.values()
        assert unsafe_search not in data_sql

如果现有测试已经解包了 cursor.execute.call_args_list,或者已经对 count_sql / count_params 做了断言,你可以考虑把这些已有断言合并进上述重写后的测试体,而不是完全替换掉。关键要求是:对这两个查询(count 和 data),都要断言原始 SQL(*_sql)不包含不安全的搜索字符串,同时参数(*_params)中必须包含该字符串。

Original comment in English

🚨 suggestion (security): Strengthen assertions to ensure raw search input never appears in the data query SQL

This already confirms the unsafe input only appears in the bound parameters for the count query. Please add an equivalent assertion for data_sql (e.g. assert "x' OR 1=1 --" not in data_sql) so both queries are covered for SQL injection hardening.

Suggested implementation:

    def test_get_filtered_conversations_uses_named_parameters(self):
        db, cursor = self._create_db_with_mock_cursor()

        unsafe_search = "x' OR 1=1 --"

        db.get_filtered_conversations(
            page=2,
            page_size=10,
            platforms=["qq"],
            message_types=["group"],
            search_query=unsafe_search,
            exclude_ids=["admin"],
            exclude_platforms=["slack"],
        )

        # Expect two queries: a count(*) query and a data query
        assert cursor.execute.call_count == 2

        (count_sql, count_params), (data_sql, data_params) = [
            call.args for call in cursor.execute.call_args_list
        ]

        # Unsafe input must only appear in bound parameters for both queries,
        # never interpolated into the raw SQL strings.
        assert unsafe_search in count_params.values()
        assert unsafe_search not in count_sql

        assert unsafe_search in data_params.values()
        assert unsafe_search not in data_sql

If the existing test already unpacks cursor.execute.call_args_list or asserts on count_sql/count_params, you may want to merge those existing assertions into this rewritten body instead of replacing them entirely. The key requirement is that both queries (count and data) assert that the raw SQL (*_sql) does not contain the unsafe search string, while the parameters (*_params) do contain it.

Comment on lines 116 to 122
result = subprocess.run(
[os.environ.get("PYTHON", sys.executable), "-c", code],
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计该调用的使用,确保其不能被外部资源控制。你可以考虑使用 shlex.escape()

来源:opengrep

Original comment in English

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

subprocess.run(cmd, check=True)
# nosemgrep: python.lang.security.audit.dangerous-subprocess-use-audit
# Uses an argv list with shell=False after checking that rclone exists in PATH.
subprocess.run(cmd, check=True, shell=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计该调用的使用,确保其不能被外部资源控制。你可以考虑使用 shlex.escape()

来源:opengrep

Original comment in English

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

@zouyonghe
Copy link
Member Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 2 个安全问题

安全问题

  • 检测到在使用子进程函数 run 时没有使用静态字符串。如果这些数据可以被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用方式,以确保它不能被外部资源控制。你可以考虑使用 shlex.escape()。(链接
  • 检测到在使用子进程函数 run 时没有使用静态字符串。如果这些数据可以被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用方式,以确保它不能被外部资源控制。你可以考虑使用 shlex.escape()。(链接
供 AI 代理使用的提示词
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 2
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得我们的评审有帮助,请考虑分享给更多人 ✨
帮我变得更有用!请对每条评论点击 👍 或 👎,我会根据你的反馈改进后续的评审。
Original comment in English

Hey - I've found 2 security issues

Security issues:

  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 2
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@zouyonghe
Copy link
Member Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 2 个安全问题和 1 个其他问题。

安全问题

  • 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 shlex.escape()。(link)
  • 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 shlex.escape()。(link)
面向 AI Agent 的提示词
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path=".github/workflows/build-docs.yml" line_range="14-16" />
<code_context>
     steps:
       - name: checkout
-        uses: actions/checkout@master
+        uses: actions/checkout@v6
       - name: nodejs installation
         uses: actions/setup-node@v6
         with:
</code_context>
<issue_to_address>
**issue (bug_risk):** 固定的 GitHub Actions 版本 `actions/checkout@v6``actions/setup-node@v6` 很可能尚不存在,会导致工作流中断。

目前最新发布的主版本是 `actions/checkout@v4``actions/setup-node@v4`。使用 `@v6` 将会导致工作流在运行时失败。请固定到已存在的版本(例如 `@v4` 或具体的 `@v4.x.y`),以在保持版本显式可控的同时确保工作流可正常运行。
</issue_to_address>

### Comment 2
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对子进程函数 `run` 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 `shlex.escape()`*来源:opengrep*
</issue_to_address>

### Comment 3
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对子进程函数 `run` 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 `shlex.escape()`*来源:opengrep*
</issue_to_address>

Sourcery 对开源项目免费——如果你觉得我们的评审有帮助,请考虑分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English

Hey - I've found 2 security issues, and 1 other issue

Security issues:

  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path=".github/workflows/build-docs.yml" line_range="14-16" />
<code_context>
     steps:
       - name: checkout
-        uses: actions/checkout@master
+        uses: actions/checkout@v6
       - name: nodejs installation
         uses: actions/setup-node@v6
         with:
</code_context>
<issue_to_address>
**issue (bug_risk):** Pinned GitHub Actions versions `actions/checkout@v6` and `actions/setup-node@v6` likely do not exist yet and will break the workflow.

The latest published majors are currently `actions/checkout@v4` and `actions/setup-node@v4`. Using `@v6` will cause the workflow to fail at runtime. Please pin to existing versions (e.g. `@v4` or a specific `@v4.x.y`) to keep the workflow functional while remaining explicit about versions.
</issue_to_address>

### Comment 2
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
                result = subprocess.run(
                    [sys.executable, "-c", code],
                    timeout=timeout,
                    capture_output=True,
                    text=True,
                    shell=False,
                )
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 3
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
        subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 116 to 122
result = subprocess.run(
[os.environ.get("PYTHON", sys.executable), "-c", code],
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 shlex.escape()

来源:opengrep

Original comment in English

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

subprocess.run(cmd, check=True)
# nosemgrep: python.lang.security.audit.dangerous-subprocess-use-audit
# Uses an argv list with shell=False after checking that rclone exists in PATH.
subprocess.run(cmd, check=True, shell=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 shlex.escape()

来源:opengrep

Original comment in English

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:XL This PR changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants