fix: harden runtime and supply chain security#6116
fix: harden runtime and supply chain security#6116zouyonghe wants to merge 10 commits intoAstrBotDevs:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the overall security posture of the application by addressing various runtime, supply chain, and code-level vulnerabilities. It introduces stricter security configurations for container deployments, enforces secure communication protocols for external WebSocket connections, and eliminates potential injection risks in Python code and frontend rendering. Additionally, it updates several third-party dependencies to their latest secure versions and adds dedicated tests to prevent regressions in these critical security areas. Highlights
Changelog
Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Hey - 我发现了 6 个安全问题和 1 个其他问题,并给出了一些总体反馈:
安全问题:
- 检测到未使用静态字符串的子进程函数
run。如果该数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用场景,确保其不会被外部资源控制。你可以考虑使用shlex.escape()。(link) - 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。(link)
- 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。(link)
- 检测到未使用静态字符串的子进程函数
run。如果该数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用场景,确保其不会被外部资源控制。你可以考虑使用shlex.escape()。(link) - 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。(link)
- 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。(link)
总体评论:
_is_local_or_private_host这个辅助函数会将所有不包含点号的主机名(例如example、prod)视为本地主机,这会削弱对ws:///http://的限制;建议收紧逻辑,只接受明确的 localhost/回环地址、内网 IP,或者一个定义清晰的主机名后缀允许列表。- 在
IPythonToolBlock.vue中,后备路径上同时进行手动escapeHtml和DOMPurify.sanitize是重复的;你可以简化为仅依赖 DOMPurify(直接传入原始的code.value),或者保留手动转义但省略第二次 DOMPurify 清理步骤。
给 AI 代理的提示
请根据本次代码评审中的评论进行修改:
## 总体评论
- `_is_local_or_private_host` 这个辅助函数会将所有不包含点号的主机名(例如 `example`、`prod`)视为本地主机,这会削弱对 `ws://` / `http://` 的限制;建议收紧逻辑,只接受明确的 localhost/回环地址、内网 IP,或者一个定义清晰的主机名后缀允许列表。
- 在 `IPythonToolBlock.vue` 中,后备路径上同时进行手动 `escapeHtml` 和 `DOMPurify.sanitize` 是重复的;你可以简化为仅依赖 DOMPurify(直接传入原始的 `code.value`),或者保留手动转义但省略第二次 DOMPurify 清理步骤。
## 单条评论
### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="11-13" />
<code_context>
+_ALLOWED_INSECURE_SUFFIXES = (".local", ".internal")
+
+
+def _is_local_or_private_host(hostname: str | None) -> bool:
+ if not hostname:
+ return False
+
+ normalized = hostname.strip("[]").lower()
+ if normalized == "localhost" or "." not in normalized:
+ return True
+ if normalized.endswith(_ALLOWED_INSECURE_SUFFIXES):
</code_context>
<issue_to_address>
**🚨 suggestion (security):** 请重新考虑将所有不带点号的主机名都视为本地。
`normalized == "localhost" or "." not in normalized` 会将任何裸主机名(例如 `"prod"`、`"api"`)视为本地,因此 `http://prod` 或 `ws://api` 可能会绕过必须使用安全传输的要求。在很多环境中,这类短主机名其实是真正的生产端点。建议把范围收缩到明确的 localhost 名称(`localhost`、`127.0.0.1`、`::1`)和私有 IP 网段,并删除 `"." not in normalized` 这个启发式逻辑。
```suggestion
normalized = hostname.strip("[]").lower()
if normalized == "localhost":
return True
```
</issue_to_address>
### Comment 2
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到未使用静态字符串的子进程函数 `run`。如果该数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用场景,确保其不会被外部资源控制。你可以考虑使用 `shlex.escape()`。
*Source: opengrep*
</issue_to_address>
### Comment 3
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="120" />
<code_context>
- For local deployment, fill in `URL` with `ws://HostIP:Port/ws`. For example, `ws://localhost:6199/ws` or `ws://127.0.0.1:6199/ws`.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。
*Source: opengrep*
</issue_to_address>
### Comment 4
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="124" />
<code_context>
> 2. For cross-host or public network deployment, do not expose plain `ws://`. Put AstrBot behind an HTTPS/TLS reverse proxy and connect with `wss://your-domain/ws` instead.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。
*Source: opengrep*
</issue_to_address>
### Comment 5
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到未使用静态字符串的子进程函数 `run`。如果该数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用场景,确保其不会被外部资源控制。你可以考虑使用 `shlex.escape()`。
*Source: opengrep*
</issue_to_address>
### Comment 6
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="113" />
<code_context>
- 本机部署时,`URL` 填写 `ws://宿主机IP:端口/ws`。如 `ws://localhost:6199/ws` 或 `ws://127.0.0.1:6199/ws`。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。
*Source: opengrep*
</issue_to_address>
### Comment 7
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="117" />
<code_context>
> 2. 如果是跨主机或公网部署,请不要暴露明文 `ws://`。应通过 HTTPS/TLS 反向代理暴露 AstrBot,并使用 `wss://你的域名/ws` 连接。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket Secure (wss)。
*Source: opengrep*
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English
Hey - I've found 6 security issues, 1 other issue, and left some high level feedback:
Security issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
General comments:
- The
_is_local_or_private_hosthelper treats any hostname without a dot (e.g.example,prod) as local, which weakens thews:///http://restriction; consider tightening this to only accept explicit localhost/loopback, private IPs, or a clearly delimited allowlist of host suffixes. - In
IPythonToolBlock.vue, the manualescapeHtmlplusDOMPurify.sanitizeon the fallback path is redundant; you can simplify by either relying solely on DOMPurify (passing rawcode.value) or keeping the manual escaping but skipping the second sanitization step.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `_is_local_or_private_host` helper treats any hostname without a dot (e.g. `example`, `prod`) as local, which weakens the `ws://`/`http://` restriction; consider tightening this to only accept explicit localhost/loopback, private IPs, or a clearly delimited allowlist of host suffixes.
- In `IPythonToolBlock.vue`, the manual `escapeHtml` plus `DOMPurify.sanitize` on the fallback path is redundant; you can simplify by either relying solely on DOMPurify (passing raw `code.value`) or keeping the manual escaping but skipping the second sanitization step.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="11-13" />
<code_context>
+_ALLOWED_INSECURE_SUFFIXES = (".local", ".internal")
+
+
+def _is_local_or_private_host(hostname: str | None) -> bool:
+ if not hostname:
+ return False
+
+ normalized = hostname.strip("[]").lower()
+ if normalized == "localhost" or "." not in normalized:
+ return True
+ if normalized.endswith(_ALLOWED_INSECURE_SUFFIXES):
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Reconsider treating any hostname without a dot as local-only.
`normalized == "localhost" or "." not in normalized` treats any bare hostname (e.g. `"prod"`, `"api"`) as local, so `http://prod` or `ws://api` could skip the secure-transport requirement. In many setups, such short names are real production endpoints. Consider restricting this to explicit localhost names (`localhost`, `127.0.0.1`, `::1`) and private IP ranges, and removing the `"." not in normalized` heuristic.
```suggestion
normalized = hostname.strip("[]").lower()
if normalized == "localhost":
return True
```
</issue_to_address>
### Comment 2
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 3
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="120" />
<code_context>
- For local deployment, fill in `URL` with `ws://HostIP:Port/ws`. For example, `ws://localhost:6199/ws` or `ws://127.0.0.1:6199/ws`.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 4
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="124" />
<code_context>
> 2. For cross-host or public network deployment, do not expose plain `ws://`. Put AstrBot behind an HTTPS/TLS reverse proxy and connect with `wss://your-domain/ws` instead.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 5
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 6
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="113" />
<code_context>
- 本机部署时,`URL` 填写 `ws://宿主机IP:端口/ws`。如 `ws://localhost:6199/ws` 或 `ws://127.0.0.1:6199/ws`。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 7
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="117" />
<code_context>
> 2. 如果是跨主机或公网部署,请不要暴露明文 `ws://`。应通过 HTTPS/TLS 反向代理暴露 AstrBot,并使用 `wss://你的域名/ws` 连接。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Code Review
本次 PR 对整个项目进行了全面的安全加固,覆盖了后端、前端、部署等多个方面,质量非常高。主要改进包括:
- 防止 SQL 注入:通过在所有数据库查询中使用绑定参数,彻底修复了不安全的 SQL 拼接问题。
- 防范 XSS 攻击:在 Vue 组件和 Jinja2 模板中,移除了不安全的
v-html和| safe过滤器,并引入DOMPurify和| tojson进行输出编码和净化,有效防止了跨站脚本攻击。 - 强化运行时安全:为 Docker 容器和 Kubernetes Pod 添加了非 root 用户执行及严格的
securityContext配置,遵循了最小权限原则。 - 安全的子进程调用:修复了 Python 子进程调用,禁用了
shell执行并避免了基于环境变量的命令覆盖风险。 - 强制安全传输:对外部 WebSocket 连接强制使用
wss://,并增加了对本地非加密连接的兼容。 - 供应链安全:更新了多个存在漏洞的前端依赖,并锁定了 CI 中使用的第三方 GitHub Actions 版本。
- 全面的回归测试:为所有安全相关的修复都添加了对应的单元测试,确保了修复的有效性和长期稳定性。
整体而言,这是一次非常出色和彻底的安全重构,代码实现清晰、规范,显著提升了项目的整体安全性。没有发现需要修改的问题。
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - 我发现了 6 个安全问题、3 个其他问题,并留下了一些高层次的反馈:
安全问题:
- 发现对 subprocess 函数
run的调用没有使用静态字符串。如果这些数据可能被恶意行为者控制,这可能会导致命令注入。请审计该调用的使用,确保它不能被外部资源控制。你可以考虑使用shlex.escape()。(链接) - 检测到不安全的 WebSocket。所有 WebSocket 连接都应该使用加密的 WebSocket(wss)。(链接)
- 检测到不安全的 WebSocket。所有 WebSocket 连接都应该使用加密的 WebSocket(wss)。(链接)
- 发现对 subprocess 函数
run的调用没有使用静态字符串。如果这些数据可能被恶意行为者控制,这可能会导致命令注入。请审计该调用的使用,确保它不能被外部资源控制。你可以考虑使用shlex.escape()。(链接) - 检测到不安全的 WebSocket。所有 WebSocket 连接都应该使用加密的 WebSocket(wss)。(链接)
- 检测到不安全的 WebSocket。所有 WebSocket 连接都应该使用加密的 WebSocket(wss)。(链接)
总体意见:
- 在
to_websocket_url中,建议对输入的 scheme 进行校验并抛出清晰的错误(而不是依赖scheme_map查找),这样像ftp://这样的意外 scheme 会以明确的错误信息失败,而不是触发KeyError。 - Dockerfile 中和 Kubernetes
securityContext中硬编码的 UID/GID 1000 未来可能会发生偏移;建议把这些值集中管理,或者添加简短注释,确保镜像用户与 Pod 的runAsUser/fsGroup保持同步。
给 AI Agents 的提示词
Please address the comments from this code review:
## Overall Comments
- In `to_websocket_url`, consider validating the input scheme and raising a clear error (rather than relying on the scheme_map lookup) so unexpected schemes like `ftp://` fail with an explicit message instead of a `KeyError`.
- The hard-coded UID/GID 1000 in the Dockerfile and the Kubernetes `securityContext` may drift over time; consider centralizing these values or adding a brief comment to keep the image user and pod `runAsUser`/`fsGroup` in sync.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="46-54" />
<code_context>
+ return parsed
+
+
+def to_websocket_url(url: str) -> str:
+ parsed = urlsplit(url.rstrip("/"))
+ scheme_map = {
+ "http": "ws",
+ "https": "wss",
+ "ws": "ws",
+ "wss": "wss",
+ }
+ return urlunsplit(
+ parsed._replace(scheme=scheme_map[parsed.scheme]),
+ )
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Consider integrating scheme validation into `to_websocket_url` or narrowing its visibility to avoid unsafe future usages.
This helper currently trusts callers and will convert `http://example.com` to `ws://example.com`. In this PR, all callers validate first, but a future caller could use it on unvalidated input and bypass the secure-transport requirement.
To harden this:
- Either have `to_websocket_url` enforce the scheme itself (e.g. call `require_secure_transport_url` or assert an allowed scheme), or
- Make it clearly internal (e.g. `_to_websocket_url`) or otherwise document that it must only be used with pre-validated URLs.
This reduces the chance of accidentally weakening the security guarantees later.
Suggested implementation:
```python
return parsed
def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str:
# Enforce secure-transport requirements before converting schemes.
parsed = require_secure_transport_url(url, label=label)
scheme_map = {
"http": "ws",
"https": "wss",
"ws": "ws",
"wss": "wss",
}
try:
ws_scheme = scheme_map[parsed.scheme]
except KeyError as exc:
raise ValueError(
f"{label} must use http://, https://, ws:// or wss://: {url}",
) from exc
return urlunsplit(parsed._replace(scheme=ws_scheme))
```
1. Ensure the validator used above is actually named `require_secure_transport_url` and is imported/in-scope in this module. If it has a different name or signature, adjust the call accordingly.
2. Since `to_websocket_url` now has an optional `label` keyword argument, callers that want custom error messages can pass it; existing callers will continue to work without changes, but if your codebase prefers not to expose `label` here, you can drop it and hard-code the label string inside the function.
</issue_to_address>
### Comment 2
<location path="tests/unit/test_websocket_security.py" line_range="54-63" />
<code_context>
class TestLocalPythonComponent:
"""Tests for LocalPythonComponent."""
+ @pytest.mark.asyncio
+ async def test_exec_uses_fixed_python_executable(self):
+ """Test Python execution ignores dynamic executable overrides."""
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a positive-path StreamingClient test to assert that secure/local URLs actually connect using the transformed WebSocket URL.
The existing `test_streaming_client_rejects_remote_http_instance` covers the rejection path for insecure remote HTTP URLs. To fully exercise `require_secure_transport_url`, `to_websocket_url`, and `StreamingClient.connect`, please also add a positive-path test that:
- Uses a local or private URL (e.g. `http://localhost:8080` or `https://example.com`),
- Patches `websockets.connect` and asserts it is awaited, and
- Verifies the URL passed to `websockets.connect` matches the expected `ws://`/`wss://` value from `to_websocket_url`.
</issue_to_address>
### Comment 3
<location path="tests/unit/test_t2i_security.py" line_range="23-32" />
<code_context>
+ assert return_url is False
+
+
+def test_t2i_templates_use_json_serialization_for_text():
+ template_paths = [
+ Path("astrbot/core/utils/t2i/template/base.html"),
+ Path("astrbot/core/utils/t2i/template/astrbot_powershell.html"),
+ ]
+
+ for template_path in template_paths:
+ content = template_path.read_text(encoding="utf-8")
+ assert "text | safe" not in content
+ assert "text | tojson" in content
</code_context>
<issue_to_address>
**suggestion (testing):** Consider discovering all T2I templates dynamically instead of hardcoding a fixed list of template paths in the test.
This test enforces the key invariant that T2I templates use `text | tojson` instead of `text | safe`, but it only checks two specific files. Please update it to enumerate all `*.html` files under `astrbot/core/utils/t2i/template/` (e.g. via `Path(.../template).glob('*.html')`) and apply the same assertions to each one so new templates are automatically covered and security regressions are caught.
</issue_to_address>
### Comment 4
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 5
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="120" />
<code_context>
- For local deployment, fill in `URL` with `ws://HostIP:Port/ws`. For example, `ws://localhost:6199/ws` or `ws://127.0.0.1:6199/ws`.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 6
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="124" />
<code_context>
> 2. For cross-host or public network deployment, do not expose plain `ws://`. Put AstrBot behind an HTTPS/TLS reverse proxy and connect with `wss://your-domain/ws` instead.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 7
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 8
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="113" />
<code_context>
- 本机部署时,`URL` 填写 `ws://宿主机IP:端口/ws`。如 `ws://localhost:6199/ws` 或 `ws://127.0.0.1:6199/ws`。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 9
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="117" />
<code_context>
> 2. 如果是跨主机或公网部署,请不要暴露明文 `ws://`。应通过 HTTPS/TLS 反向代理暴露 AstrBot,并使用 `wss://你的域名/ws` 连接。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的 Review。
Original comment in English
Hey - I've found 6 security issues, 3 other issues, and left some high level feedback:
Security issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
General comments:
- In
to_websocket_url, consider validating the input scheme and raising a clear error (rather than relying on the scheme_map lookup) so unexpected schemes likeftp://fail with an explicit message instead of aKeyError. - The hard-coded UID/GID 1000 in the Dockerfile and the Kubernetes
securityContextmay drift over time; consider centralizing these values or adding a brief comment to keep the image user and podrunAsUser/fsGroupin sync.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `to_websocket_url`, consider validating the input scheme and raising a clear error (rather than relying on the scheme_map lookup) so unexpected schemes like `ftp://` fail with an explicit message instead of a `KeyError`.
- The hard-coded UID/GID 1000 in the Dockerfile and the Kubernetes `securityContext` may drift over time; consider centralizing these values or adding a brief comment to keep the image user and pod `runAsUser`/`fsGroup` in sync.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="46-54" />
<code_context>
+ return parsed
+
+
+def to_websocket_url(url: str) -> str:
+ parsed = urlsplit(url.rstrip("/"))
+ scheme_map = {
+ "http": "ws",
+ "https": "wss",
+ "ws": "ws",
+ "wss": "wss",
+ }
+ return urlunsplit(
+ parsed._replace(scheme=scheme_map[parsed.scheme]),
+ )
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Consider integrating scheme validation into `to_websocket_url` or narrowing its visibility to avoid unsafe future usages.
This helper currently trusts callers and will convert `http://example.com` to `ws://example.com`. In this PR, all callers validate first, but a future caller could use it on unvalidated input and bypass the secure-transport requirement.
To harden this:
- Either have `to_websocket_url` enforce the scheme itself (e.g. call `require_secure_transport_url` or assert an allowed scheme), or
- Make it clearly internal (e.g. `_to_websocket_url`) or otherwise document that it must only be used with pre-validated URLs.
This reduces the chance of accidentally weakening the security guarantees later.
Suggested implementation:
```python
return parsed
def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str:
# Enforce secure-transport requirements before converting schemes.
parsed = require_secure_transport_url(url, label=label)
scheme_map = {
"http": "ws",
"https": "wss",
"ws": "ws",
"wss": "wss",
}
try:
ws_scheme = scheme_map[parsed.scheme]
except KeyError as exc:
raise ValueError(
f"{label} must use http://, https://, ws:// or wss://: {url}",
) from exc
return urlunsplit(parsed._replace(scheme=ws_scheme))
```
1. Ensure the validator used above is actually named `require_secure_transport_url` and is imported/in-scope in this module. If it has a different name or signature, adjust the call accordingly.
2. Since `to_websocket_url` now has an optional `label` keyword argument, callers that want custom error messages can pass it; existing callers will continue to work without changes, but if your codebase prefers not to expose `label` here, you can drop it and hard-code the label string inside the function.
</issue_to_address>
### Comment 2
<location path="tests/unit/test_websocket_security.py" line_range="54-63" />
<code_context>
class TestLocalPythonComponent:
"""Tests for LocalPythonComponent."""
+ @pytest.mark.asyncio
+ async def test_exec_uses_fixed_python_executable(self):
+ """Test Python execution ignores dynamic executable overrides."""
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a positive-path StreamingClient test to assert that secure/local URLs actually connect using the transformed WebSocket URL.
The existing `test_streaming_client_rejects_remote_http_instance` covers the rejection path for insecure remote HTTP URLs. To fully exercise `require_secure_transport_url`, `to_websocket_url`, and `StreamingClient.connect`, please also add a positive-path test that:
- Uses a local or private URL (e.g. `http://localhost:8080` or `https://example.com`),
- Patches `websockets.connect` and asserts it is awaited, and
- Verifies the URL passed to `websockets.connect` matches the expected `ws://`/`wss://` value from `to_websocket_url`.
</issue_to_address>
### Comment 3
<location path="tests/unit/test_t2i_security.py" line_range="23-32" />
<code_context>
+ assert return_url is False
+
+
+def test_t2i_templates_use_json_serialization_for_text():
+ template_paths = [
+ Path("astrbot/core/utils/t2i/template/base.html"),
+ Path("astrbot/core/utils/t2i/template/astrbot_powershell.html"),
+ ]
+
+ for template_path in template_paths:
+ content = template_path.read_text(encoding="utf-8")
+ assert "text | safe" not in content
+ assert "text | tojson" in content
</code_context>
<issue_to_address>
**suggestion (testing):** Consider discovering all T2I templates dynamically instead of hardcoding a fixed list of template paths in the test.
This test enforces the key invariant that T2I templates use `text | tojson` instead of `text | safe`, but it only checks two specific files. Please update it to enumerate all `*.html` files under `astrbot/core/utils/t2i/template/` (e.g. via `Path(.../template).glob('*.html')`) and apply the same assertions to each one so new templates are automatically covered and security regressions are caught.
</issue_to_address>
### Comment 4
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 5
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="120" />
<code_context>
- For local deployment, fill in `URL` with `ws://HostIP:Port/ws`. For example, `ws://localhost:6199/ws` or `ws://127.0.0.1:6199/ws`.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 6
<location path="docs/en/platform/aiocqhttp/napcat.md" line_range="124" />
<code_context>
> 2. For cross-host or public network deployment, do not expose plain `ws://`. Put AstrBot behind an HTTPS/TLS reverse proxy and connect with `wss://your-domain/ws` instead.
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 7
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 8
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="113" />
<code_context>
- 本机部署时,`URL` 填写 `ws://宿主机IP:端口/ws`。如 `ws://localhost:6199/ws` 或 `ws://127.0.0.1:6199/ws`。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 9
<location path="docs/zh/platform/aiocqhttp/napcat.md" line_range="117" />
<code_context>
> 2. 如果是跨主机或公网部署,请不要暴露明文 `ws://`。应通过 HTTPS/TLS 反向代理暴露 AstrBot,并使用 `wss://你的域名/ws` 连接。
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| @pytest.mark.asyncio | ||
| async def test_streaming_client_rejects_remote_http_instance() -> None: | ||
| client = StreamingClient("http://example.com", "token") | ||
|
|
||
| with patch( | ||
| "astrbot.core.platform.sources.misskey.misskey_api.websockets.connect", | ||
| new_callable=AsyncMock, | ||
| ) as mock_connect: | ||
| assert await client.connect() is False | ||
|
|
There was a problem hiding this comment.
suggestion (testing): 建议添加一个 StreamingClient 的“正向路径”测试,用于验证安全/本地 URL 能够实际使用转换后的 WebSocket URL 建立连接。
当前的 test_streaming_client_rejects_remote_http_instance 已经覆盖了对不安全远程 HTTP URL 的拒绝路径。为了完整测试 require_secure_transport_url、to_websocket_url 和 StreamingClient.connect,请再添加一个正向路径测试,要求:
- 使用本地或私有 URL(例如
http://localhost:8080或https://example.com), - 对
websockets.connect做 patch,并断言其被 await 调用, - 验证传给
websockets.connect的 URL 与to_websocket_url生成的期望ws:///wss://值一致。
Original comment in English
suggestion (testing): Consider adding a positive-path StreamingClient test to assert that secure/local URLs actually connect using the transformed WebSocket URL.
The existing test_streaming_client_rejects_remote_http_instance covers the rejection path for insecure remote HTTP URLs. To fully exercise require_secure_transport_url, to_websocket_url, and StreamingClient.connect, please also add a positive-path test that:
- Uses a local or private URL (e.g.
http://localhost:8080orhttps://example.com), - Patches
websockets.connectand asserts it is awaited, and - Verifies the URL passed to
websockets.connectmatches the expectedws:///wss://value fromto_websocket_url.
| def test_t2i_templates_use_json_serialization_for_text(): | ||
| template_paths = [ | ||
| Path("astrbot/core/utils/t2i/template/base.html"), | ||
| Path("astrbot/core/utils/t2i/template/astrbot_powershell.html"), | ||
| ] | ||
|
|
||
| for template_path in template_paths: | ||
| content = template_path.read_text(encoding="utf-8") | ||
| assert "text | safe" not in content | ||
| assert "text | tojson" in content |
There was a problem hiding this comment.
suggestion (testing): 建议在测试中动态发现所有 T2I 模板,而不是硬编码固定的模板路径列表。
这个测试用于保证一个关键不变式:T2I 模板使用的是 text | tojson 而不是 text | safe,但它目前只检查了两个特定文件。请更新测试逻辑,枚举 astrbot/core/utils/t2i/template/ 目录下所有 *.html 文件(例如通过 Path(.../template).glob('*.html')),并对每个文件应用同样的断言,这样新模板会自动被覆盖,从而更好地防止安全回归。
Original comment in English
suggestion (testing): Consider discovering all T2I templates dynamically instead of hardcoding a fixed list of template paths in the test.
This test enforces the key invariant that T2I templates use text | tojson instead of text | safe, but it only checks two specific files. Please update it to enumerate all *.html files under astrbot/core/utils/t2i/template/ (e.g. via Path(.../template).glob('*.html')) and apply the same assertions to each one so new templates are automatically covered and security regressions are caught.
| result = subprocess.run( | ||
| [os.environ.get("PYTHON", sys.executable), "-c", code], | ||
| [sys.executable, "-c", code], | ||
| timeout=timeout, | ||
| capture_output=True, | ||
| text=True, | ||
| shell=False, | ||
| ) |
There was a problem hiding this comment.
security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对 subprocess 函数 run 的调用没有使用静态字符串。如果这些数据可能被恶意行为者控制,这可能会导致命令注入。请审计该调用的使用,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()。
Source: opengrep
Original comment in English
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - 我发现了 4 个安全问题,并给出了一些整体性的反馈:
安全问题:
- 检测到对子进程函数
run的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用shlex.escape()。(link) - 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket(wss)。(link)
- 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket(wss)。(link)
- 检测到对子进程函数
run的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用shlex.escape()。(link)
总体评论:
- 在
SQLiteDatabase.get_filtered_conversations中,data_sql会把"FROM webchat_conversation"直接与where_sql拼接(而where_sql已经不再包含前导空格),从而生成FROM webchat_conversationWHERE ...。请在WHERE之前显式添加一个空格(例如"FROM webchat_conversation " + where_sql),以避免生成无效的 SQL。
给 AI Agent 的提示
请根据本次代码评审中的评论进行修改:
## Overall Comments
- 在 `SQLiteDatabase.get_filtered_conversations` 中,`data_sql` 会把 `"FROM webchat_conversation"` 直接与 `where_sql` 拼接(而 `where_sql` 已经不再包含前导空格),从而生成 `FROM webchat_conversationWHERE ...`。请在 `WHERE` 之前显式添加一个空格(例如 `"FROM webchat_conversation " + where_sql`),以避免生成无效的 SQL。
## Individual Comments
### Comment 1
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对子进程函数 `run` 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 `shlex.escape()`。
*来源: opengrep*
</issue_to_address>
### Comment 2
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="53" />
<code_context>
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket(wss)。
*来源: opengrep*
</issue_to_address>
### Comment 3
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="72" />
<code_context>
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用加密的 WebSocket(wss)。
*来源: opengrep*
</issue_to_address>
### Comment 4
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对子进程函数 `run` 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 `shlex.escape()`。
*来源: opengrep*
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English
Hey - I've found 4 security issues, and left some high level feedback:
Security issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
General comments:
- In
SQLiteDatabase.get_filtered_conversations,data_sqlconcatenates"FROM webchat_conversation"directly withwhere_sql(which no longer includes a leading space), producingFROM webchat_conversationWHERE ...; add an explicit space beforeWHERE(e.g.,"FROM webchat_conversation " + where_sql) to avoid generating invalid SQL.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `SQLiteDatabase.get_filtered_conversations`, `data_sql` concatenates `"FROM webchat_conversation"` directly with `where_sql` (which no longer includes a leading space), producing `FROM webchat_conversationWHERE ...`; add an explicit space before `WHERE` (e.g., `"FROM webchat_conversation " + where_sql`) to avoid generating invalid SQL.
## Individual Comments
### Comment 1
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 2
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="53" />
<code_context>
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 3
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="72" />
<code_context>
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 4
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| result = subprocess.run( | ||
| [os.environ.get("PYTHON", sys.executable), "-c", code], | ||
| [sys.executable, "-c", code], | ||
| timeout=timeout, | ||
| capture_output=True, | ||
| text=True, | ||
| shell=False, | ||
| ) |
There was a problem hiding this comment.
security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 shlex.escape()。
来源: opengrep
Original comment in English
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep
| print(f"Uploading to: {target}") | ||
|
|
||
| subprocess.run(cmd, check=True) | ||
| subprocess.run(cmd, check=True, shell=False) |
There was a problem hiding this comment.
security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用,其参数并非静态字符串。如果这些数据可能被恶意行为者控制,则可能构成命令注入。请审计该调用的使用,以确保它不受外部资源控制。你可以考虑使用 shlex.escape()。
来源: opengrep
Original comment in English
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep
SourceryAI
left a comment
There was a problem hiding this comment.
Hey - 我发现了 4 个安全问题、4 个其他问题,并给出了一些高层次反馈:
Security issues:
- 检测到对 subprocess 函数
run的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用shlex.escape()。(link) - 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。(link)
- 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。(link)
- 检测到对 subprocess 函数
run的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用shlex.escape()。(link)
General comments:
- 在 Kubernetes 部署清单中,主镜像(
mlikiowa/napcat-docker:latest和soulter/astrbot:latest)仍然使用latest标签;可以考虑像你在其他地方那样,为了实现加固和可复现部署的目标,将它们固定到明确的版本号。
面向 AI Agent 的提示
Please address the comments from this code review:
## Overall Comments
- 在 Kubernetes 部署清单中,主镜像(`mlikiowa/napcat-docker:latest` 和 `soulter/astrbot:latest`)仍然使用 `latest` 标签;可以考虑像你在其他地方那样,为了实现加固和可复现部署的目标,将它们固定到明确的版本号。
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="46-55" />
<code_context>
+ return parsed
+
+
+def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str:
+ normalized_url = url.rstrip("/")
+ parsed = urlsplit(normalized_url)
+ allowed_schemes = {"http", "https", "ws", "wss"}
+
+ if parsed.scheme not in allowed_schemes:
+ raise ValueError(
+ f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
+ )
+
+ parsed = require_secure_transport_url(
+ normalized_url,
+ label=label,
+ allowed_schemes=allowed_schemes,
+ )
+ scheme_map = {
+ "http": "ws",
+ "https": "wss",
+ "ws": "ws",
+ "wss": "wss",
+ }
+
+ try:
+ ws_scheme = scheme_map[parsed.scheme]
+ except KeyError as exc:
+ raise ValueError(
+ f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
+ ) from exc
+
+ return urlunsplit(
+ parsed._replace(scheme=ws_scheme),
+ )
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 避免从 WebSocket URL 中去除所有尾部斜杠,以保留路径语义。
`to_websocket_url` 当前使用 `url.rstrip("/")`,这会去除*所有*尾部斜杠,从而改变某些路径的含义(例如 `https://example.com/app/` 与 `https://example.com/app`)。如果目标只是去掉用户多输入的单个尾部斜杠,可以考虑使用更有针对性的方法:
```python
normalized_url = url[:-1] if url.endswith("/") else url
```
或者完全跳过这一步「规范化」,直接依赖 `urlsplit`/`urlunsplit`,将任何路径规范化的责任留给调用方。这样可以避免在尾部斜杠有语义意义的端点上篡改路径。
</issue_to_address>
### Comment 2
<location path="tests/unit/test_websocket_security.py" line_range="12-19" />
<code_context>
+)
+
+
+def test_require_secure_transport_url_allows_local_ws() -> None:
+ parsed = require_secure_transport_url(
+ "ws://localhost:5140/satori/v1/events",
+ label="Satori WebSocket URL",
+ allowed_schemes={"ws", "wss"},
+ )
+
+ assert parsed.scheme == "ws"
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** 在使用不安全 ws:// 时考虑为私有 IP 和本地域名添加测试覆盖
当前已经覆盖了 `ws://localhost`。由于 `_is_local_or_private_host` 也接受私有/环回 IP 和内部域名后缀,请添加诸如 `ws://127.0.0.1/...`、`ws://10.0.0.5/...` 和 `ws://service.internal/...` 等案例,以锁定预期的允许列表,并防止主机分类逻辑回归。
</issue_to_address>
### Comment 3
<location path="tests/unit/test_sqlite_v3.py" line_range="44-53" />
<code_context>
+ def test_get_filtered_conversations_uses_named_parameters(self):
</code_context>
<issue_to_address>
**suggestion (testing):** 考虑为无过滤条件的查询以及 `get_filtered_conversations` 中包含多个列表项的情况添加测试
为了完善覆盖率:
- 增加一个所有过滤参数都为空/None 的用例,并断言 `count_sql`/`data_sql` 中不存在 `WHERE` 子句,同时参数仅包含 `page_size` 和 `offset`。
- 增加一个包含多个 `platforms` 和/或 `message_types` 条目的用例,以验证会生成不同的参数名(例如 `:platform_0`、`:platform_1`),并且这些参数在 SQL 中被正确引用。
这将更好地防止查询构建逻辑的回归。
建议实现:
```python
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query="x' OR 1=1 --",
exclude_ids=["admin"],
exclude_platforms=["slack"],
)
# Ensure two queries (count + data) are executed with named parameters
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
# Basic contract: queries use named parameters and do not interpolate the search_query
assert ":" in count_sql
assert ":" in data_sql
# Dangerous search_query should not appear directly in the SQL but only in bound params
assert "x' OR 1=1 --" not in count_sql
assert "x' OR 1=1 --" not in data_sql
assert any(v == "x' OR 1=1 --" for v in count_params.values())
assert any(v == "x' OR 1=1 --" for v in data_params.values())
def test_get_filtered_conversations_without_filters_has_no_where_clause(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=1,
page_size=25,
platforms=None,
message_types=None,
search_query=None,
exclude_ids=None,
exclude_platforms=None,
)
# We should still execute the standard count + data queries
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
normalized_count_sql = " ".join(count_sql.split()).upper()
normalized_data_sql = " ".join(data_sql.split()).upper()
# No WHERE clause should be present when no filters are provided
assert " WHERE " not in normalized_count_sql
assert " WHERE " not in normalized_data_sql
# Params for the data query should only include pagination
assert "page_size" in data_params
assert "offset" in data_params
assert len(data_params) == 2
# Count query may or may not use page_size; it must not include filter params
forbidden_prefixes = ("platform_", "message_type_", "exclude_id_", "exclude_platform_", "search_")
assert not any(k.startswith(forbidden_prefixes) for k in count_params.keys())
def test_get_filtered_conversations_with_multiple_list_entries_uses_distinct_params(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=1,
page_size=10,
platforms=["qq", "slack"],
message_types=["group", "private"],
search_query=None,
exclude_ids=None,
exclude_platforms=None,
)
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
# Ensure multiple distinct parameter names are generated for list filters
platform_param_keys = {k for k in data_params.keys() if k.startswith("platform_")}
message_type_param_keys = {k for k in data_params.keys() if k.startswith("message_type_")}
# At least two distinct params for each list
assert len(platform_param_keys) >= 2
assert len(message_type_param_keys) >= 2
# All generated parameter names should appear in the SQL
for key in platform_param_keys | message_type_param_keys:
assert f":{key}" in data_sql
# And none of them are accidentally reused in a conflicting way
assert platform_param_keys.isdisjoint(message_type_param_keys)
```
这些测试假设:
1. `get_filtered_conversations` 通过 `cursor.execute` 恰好执行两条查询:先是计数查询,然后是数据查询。
2. 唯一的非过滤参数是 `page_size` 和 `offset`。
3. 列表过滤参数的名称前缀分别为 `platform_` 和 `message_type_`。
如果你的实现不同,请相应调整:
- `get_filtered_conversations` 调用中的参数列表(按需增加/删除关键字参数)。
- 针对 `count_params`/`data_params` 键的断言,以及参数名前缀(`platform_`、`message_type_`)以匹配你查询构造器的命名方案。
- 如果方法发出的查询方式不同,则调整对 `cursor.execute.call_count` 以及调用顺序的预期。
</issue_to_address>
### Comment 4
<location path="tests/unit/test_t2i_security.py" line_range="31-33" />
<code_context>
+ assert template_paths
+
+ for template_path in template_paths:
+ content = template_path.read_text(encoding="utf-8")
+ assert "text | safe" not in content
+ assert "text | tojson" in content
</code_context>
<issue_to_address>
**suggestion (testing):** 让模板安全性断言更具体地针对文本占位符,以避免误报
因为此检查只搜索子串 `"text | tojson"`,如果该文本出现在注释或非用户可控的值上,也有可能通过测试。为了更好地体现测试意图,可以改为匹配特定的模板占位符(例如通过正则匹配 `{{\s*text\s*\|\s*tojson\s*}}`),从而确保测试验证的是用户提供的 `text` 在每个模板中都经过安全序列化。
建议实现:
```python
import re
def test_t2i_templates_use_json_serialization_for_text():
```
```python
content = template_path.read_text(encoding="utf-8")
assert "text | safe" not in content
assert re.search(r"{{\s*text\s*\|\s*tojson\s*}}", content) is not None
```
</issue_to_address>
### Comment 5
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对 subprocess 函数 `run` 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 `shlex.escape()`。
*Source: opengrep*
</issue_to_address>
### Comment 6
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="53" />
<code_context>
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。
*Source: opengrep*
</issue_to_address>
### Comment 7
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="72" />
<code_context>
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。
*Source: opengrep*
</issue_to_address>
### Comment 8
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对 subprocess 函数 `run` 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 `shlex.escape()`。
*Source: opengrep*
</issue_to_address>Hi @zouyonghe! 👋
感谢你通过评论 @sourcery-ai review 来试用 Sourcery!🚀
安装 sourcery-ai bot,即可在每个 pull request 上自动获取代码审查 ✨
帮我变得更有用!请在每条评论上点击 👍 或 👎,我会利用你的反馈来改进后续的审查。Original comment in English
Hey - I've found 4 security issues, 4 other issues, and left some high level feedback:
Security issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
General comments:
- In the Kubernetes deployment manifests, the main images (
mlikiowa/napcat-docker:latestandsoulter/astrbot:latest) are still using thelatesttag; consider pinning them to explicit versions to match the hardened, reproducible deployment goal you applied elsewhere.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In the Kubernetes deployment manifests, the main images (`mlikiowa/napcat-docker:latest` and `soulter/astrbot:latest`) are still using the `latest` tag; consider pinning them to explicit versions to match the hardened, reproducible deployment goal you applied elsewhere.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="46-55" />
<code_context>
+ return parsed
+
+
+def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str:
+ normalized_url = url.rstrip("/")
+ parsed = urlsplit(normalized_url)
+ allowed_schemes = {"http", "https", "ws", "wss"}
+
+ if parsed.scheme not in allowed_schemes:
+ raise ValueError(
+ f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
+ )
+
+ parsed = require_secure_transport_url(
+ normalized_url,
+ label=label,
+ allowed_schemes=allowed_schemes,
+ )
+ scheme_map = {
+ "http": "ws",
+ "https": "wss",
+ "ws": "ws",
+ "wss": "wss",
+ }
+
+ try:
+ ws_scheme = scheme_map[parsed.scheme]
+ except KeyError as exc:
+ raise ValueError(
+ f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
+ ) from exc
+
+ return urlunsplit(
+ parsed._replace(scheme=ws_scheme),
+ )
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Avoid stripping all trailing slashes from WebSocket URLs to preserve path semantics.
`to_websocket_url` currently uses `url.rstrip("/")`, which strips *all* trailing slashes and can change the meaning of some paths (e.g. `https://example.com/app/` vs `https://example.com/app`). If the goal is only to trim a single user-added slash, consider a more targeted approach:
```python
normalized_url = url[:-1] if url.endswith("/") else url
```
or skip this normalization entirely and rely on `urlsplit`/`urlunsplit`, leaving any path normalization to callers. This avoids altering endpoints where a trailing slash is significant.
</issue_to_address>
### Comment 2
<location path="tests/unit/test_websocket_security.py" line_range="12-19" />
<code_context>
+)
+
+
+def test_require_secure_transport_url_allows_local_ws() -> None:
+ parsed = require_secure_transport_url(
+ "ws://localhost:5140/satori/v1/events",
+ label="Satori WebSocket URL",
+ allowed_schemes={"ws", "wss"},
+ )
+
+ assert parsed.scheme == "ws"
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding coverage for private IPs and local hostnames when using insecure ws://
This already covers `ws://localhost`. Since `_is_local_or_private_host` also accepts private/loopback IPs and internal suffixes, please add cases like `ws://127.0.0.1/...`, `ws://10.0.0.5/...`, and `ws://service.internal/...` to lock in the intended allowlist and prevent regressions in host classification.
</issue_to_address>
### Comment 3
<location path="tests/unit/test_sqlite_v3.py" line_range="44-53" />
<code_context>
+ def test_get_filtered_conversations_uses_named_parameters(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for no-filter queries and multiple list entries in get_filtered_conversations
To round out coverage:
- Add a case where all filter arguments are empty/None and assert that `count_sql`/`data_sql` have no `WHERE` clause and the params only include `page_size` and `offset`.
- Add a case with multiple `platforms` and/or `message_types` entries to verify distinct parameter names (e.g. `:platform_0`, `:platform_1`) are generated and correctly referenced in the SQL.
This will better protect against regressions in the query-building logic.
Suggested implementation:
```python
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query="x' OR 1=1 --",
exclude_ids=["admin"],
exclude_platforms=["slack"],
)
# Ensure two queries (count + data) are executed with named parameters
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
# Basic contract: queries use named parameters and do not interpolate the search_query
assert ":" in count_sql
assert ":" in data_sql
# Dangerous search_query should not appear directly in the SQL but only in bound params
assert "x' OR 1=1 --" not in count_sql
assert "x' OR 1=1 --" not in data_sql
assert any(v == "x' OR 1=1 --" for v in count_params.values())
assert any(v == "x' OR 1=1 --" for v in data_params.values())
def test_get_filtered_conversations_without_filters_has_no_where_clause(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=1,
page_size=25,
platforms=None,
message_types=None,
search_query=None,
exclude_ids=None,
exclude_platforms=None,
)
# We should still execute the standard count + data queries
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
normalized_count_sql = " ".join(count_sql.split()).upper()
normalized_data_sql = " ".join(data_sql.split()).upper()
# No WHERE clause should be present when no filters are provided
assert " WHERE " not in normalized_count_sql
assert " WHERE " not in normalized_data_sql
# Params for the data query should only include pagination
assert "page_size" in data_params
assert "offset" in data_params
assert len(data_params) == 2
# Count query may or may not use page_size; it must not include filter params
forbidden_prefixes = ("platform_", "message_type_", "exclude_id_", "exclude_platform_", "search_")
assert not any(k.startswith(forbidden_prefixes) for k in count_params.keys())
def test_get_filtered_conversations_with_multiple_list_entries_uses_distinct_params(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=1,
page_size=10,
platforms=["qq", "slack"],
message_types=["group", "private"],
search_query=None,
exclude_ids=None,
exclude_platforms=None,
)
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
# Ensure multiple distinct parameter names are generated for list filters
platform_param_keys = {k for k in data_params.keys() if k.startswith("platform_")}
message_type_param_keys = {k for k in data_params.keys() if k.startswith("message_type_")}
# At least two distinct params for each list
assert len(platform_param_keys) >= 2
assert len(message_type_param_keys) >= 2
# All generated parameter names should appear in the SQL
for key in platform_param_keys | message_type_param_keys:
assert f":{key}" in data_sql
# And none of them are accidentally reused in a conflicting way
assert platform_param_keys.isdisjoint(message_type_param_keys)
```
These tests assume:
1. `get_filtered_conversations` executes exactly two queries via `cursor.execute`: a count query first, then a data query.
2. The only non-filter parameters are `page_size` and `offset`.
3. Parameter names for list filters are prefixed as `platform_` and `message_type_`.
If your implementation differs, adjust:
- The argument list in the `get_filtered_conversations` calls (add/remove keyword arguments as needed).
- The assertions on `count_params`/`data_params` keys and the param-name prefixes (`platform_`, `message_type_`) to match the actual naming scheme used by your query builder.
- The expectations on `cursor.execute.call_count` and call order if the method issues queries differently.
</issue_to_address>
### Comment 4
<location path="tests/unit/test_t2i_security.py" line_range="31-33" />
<code_context>
+ assert template_paths
+
+ for template_path in template_paths:
+ content = template_path.read_text(encoding="utf-8")
+ assert "text | safe" not in content
+ assert "text | tojson" in content
</code_context>
<issue_to_address>
**suggestion (testing):** Make the template safety assertion more specific to the text placeholder to avoid false positives
Because this check only looks for the substring `"text | tojson"`, it could pass if that text appears in a comment or for a non-user-controlled value. To better enforce the intent, match the specific template placeholder instead (e.g., via regex for `{{\s*text\s*\|\s*tojson\s*}}`) so the test confirms the user-supplied `text` is safely serialized in each template.
Suggested implementation:
```python
import re
def test_t2i_templates_use_json_serialization_for_text():
```
```python
content = template_path.read_text(encoding="utf-8")
assert "text | safe" not in content
assert re.search(r"{{\s*text\s*\|\s*tojson\s*}}", content) is not None
```
</issue_to_address>
### Comment 5
<location path="astrbot/core/computer/booters/local.py" line_range="114-120" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 6
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="53" />
<code_context>
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 7
<location path="astrbot/core/platform/sources/websocket_security.py" line_range="72" />
<code_context>
f"{label} must use http://, https://, ws:// or wss://: {normalized_url}",
</code_context>
<issue_to_address>
**security (javascript.lang.security.detect-insecure-websocket):** Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
*Source: opengrep*
</issue_to_address>
### Comment 8
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="223" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>Hi @zouyonghe! 👋
Thanks for trying out Sourcery by commenting with @sourcery-ai review! 🚀
Install the sourcery-ai bot to get automatic code reviews on every pull request ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.| def to_websocket_url(url: str, *, label: str = "WebSocket URL") -> str: | ||
| normalized_url = url.rstrip("/") | ||
| parsed = urlsplit(normalized_url) | ||
| allowed_schemes = {"http", "https", "ws", "wss"} | ||
|
|
||
| if parsed.scheme not in allowed_schemes: | ||
| raise ValueError( | ||
| f"{label} must use http://, https://, ws:// or wss://: {normalized_url}", | ||
| ) | ||
|
|
There was a problem hiding this comment.
suggestion (bug_risk): 避免从 WebSocket URL 中去除所有尾部斜杠,以保留路径语义。
to_websocket_url 当前使用 url.rstrip("/"),这会去除所有尾部斜杠,从而改变某些路径的含义(例如 https://example.com/app/ 与 https://example.com/app)。如果目标只是去掉用户多输入的单个尾部斜杠,可以考虑使用更有针对性的方法:
normalized_url = url[:-1] if url.endswith("/") else url或者完全跳过这一步「规范化」,直接依赖 urlsplit/urlunsplit,将任何路径规范化的责任留给调用方。这样可以避免在尾部斜杠有语义意义的端点上篡改路径。
Original comment in English
suggestion (bug_risk): Avoid stripping all trailing slashes from WebSocket URLs to preserve path semantics.
to_websocket_url currently uses url.rstrip("/"), which strips all trailing slashes and can change the meaning of some paths (e.g. https://example.com/app/ vs https://example.com/app). If the goal is only to trim a single user-added slash, consider a more targeted approach:
normalized_url = url[:-1] if url.endswith("/") else urlor skip this normalization entirely and rely on urlsplit/urlunsplit, leaving any path normalization to callers. This avoids altering endpoints where a trailing slash is significant.
| def test_require_secure_transport_url_allows_local_ws() -> None: | ||
| parsed = require_secure_transport_url( | ||
| "ws://localhost:5140/satori/v1/events", | ||
| label="Satori WebSocket URL", | ||
| allowed_schemes={"ws", "wss"}, | ||
| ) | ||
|
|
||
| assert parsed.scheme == "ws" |
There was a problem hiding this comment.
suggestion (testing): 在使用不安全 ws:// 时考虑为私有 IP 和本地域名添加测试覆盖
当前已经覆盖了 ws://localhost。由于 _is_local_or_private_host 也接受私有/环回 IP 和内部域名后缀,请添加诸如 ws://127.0.0.1/...、ws://10.0.0.5/... 和 ws://service.internal/... 等案例,以锁定预期的允许列表,并防止主机分类逻辑回归。
Original comment in English
suggestion (testing): Consider adding coverage for private IPs and local hostnames when using insecure ws://
This already covers ws://localhost. Since _is_local_or_private_host also accepts private/loopback IPs and internal suffixes, please add cases like ws://127.0.0.1/..., ws://10.0.0.5/..., and ws://service.internal/... to lock in the intended allowlist and prevent regressions in host classification.
| def test_get_filtered_conversations_uses_named_parameters(self): | ||
| db, cursor = self._create_db_with_mock_cursor() | ||
|
|
||
| db.get_filtered_conversations( | ||
| page=2, | ||
| page_size=10, | ||
| platforms=["qq"], | ||
| message_types=["group"], | ||
| search_query="x' OR 1=1 --", | ||
| exclude_ids=["admin"], |
There was a problem hiding this comment.
suggestion (testing): 考虑为无过滤条件的查询以及 get_filtered_conversations 中包含多个列表项的情况添加测试
为了完善覆盖率:
- 增加一个所有过滤参数都为空/None 的用例,并断言
count_sql/data_sql中不存在WHERE子句,同时参数仅包含page_size和offset。 - 增加一个包含多个
platforms和/或message_types条目的用例,以验证会生成不同的参数名(例如:platform_0、:platform_1),并且这些参数在 SQL 中被正确引用。
这将更好地防止查询构建逻辑的回归。
建议实现:
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query="x' OR 1=1 --",
exclude_ids=["admin"],
exclude_platforms=["slack"],
)
# Ensure two queries (count + data) are executed with named parameters
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
# Basic contract: queries use named parameters and do not interpolate the search_query
assert ":" in count_sql
assert ":" in data_sql
# Dangerous search_query should not appear directly in the SQL but only in bound params
assert "x' OR 1=1 --" not in count_sql
assert "x' OR 1=1 --" not in data_sql
assert any(v == "x' OR 1=1 --" for v in count_params.values())
assert any(v == "x' OR 1=1 --" for v in data_params.values())
def test_get_filtered_conversations_without_filters_has_no_where_clause(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=1,
page_size=25,
platforms=None,
message_types=None,
search_query=None,
exclude_ids=None,
exclude_platforms=None,
)
# We should still execute the standard count + data queries
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
normalized_count_sql = " ".join(count_sql.split()).upper()
normalized_data_sql = " ".join(data_sql.split()).upper()
# No WHERE clause should be present when no filters are provided
assert " WHERE " not in normalized_count_sql
assert " WHERE " not in normalized_data_sql
# Params for the data query should only include pagination
assert "page_size" in data_params
assert "offset" in data_params
assert len(data_params) == 2
# Count query may or may not use page_size; it must not include filter params
forbidden_prefixes = ("platform_", "message_type_", "exclude_id_", "exclude_platform_", "search_")
assert not any(k.startswith(forbidden_prefixes) for k in count_params.keys())
def test_get_filtered_conversations_with_multiple_list_entries_uses_distinct_params(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=1,
page_size=10,
platforms=["qq", "slack"],
message_types=["group", "private"],
search_query=None,
exclude_ids=None,
exclude_platforms=None,
)
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
# Ensure multiple distinct parameter names are generated for list filters
platform_param_keys = {k for k in data_params.keys() if k.startswith("platform_")}
message_type_param_keys = {k for k in data_params.keys() if k.startswith("message_type_")}
# At least two distinct params for each list
assert len(platform_param_keys) >= 2
assert len(message_type_param_keys) >= 2
# All generated parameter names should appear in the SQL
for key in platform_param_keys | message_type_param_keys:
assert f":{key}" in data_sql
# And none of them are accidentally reused in a conflicting way
assert platform_param_keys.isdisjoint(message_type_param_keys)这些测试假设:
get_filtered_conversations通过cursor.execute恰好执行两条查询:先是计数查询,然后是数据查询。- 唯一的非过滤参数是
page_size和offset。 - 列表过滤参数的名称前缀分别为
platform_和message_type_。
如果你的实现不同,请相应调整:
get_filtered_conversations调用中的参数列表(按需增加/删除关键字参数)。- 针对
count_params/data_params键的断言,以及参数名前缀(platform_、message_type_)以匹配你查询构造器的命名方案。 - 如果方法发出的查询方式不同,则调整对
cursor.execute.call_count以及调用顺序的预期。
Original comment in English
suggestion (testing): Consider adding tests for no-filter queries and multiple list entries in get_filtered_conversations
To round out coverage:
- Add a case where all filter arguments are empty/None and assert that
count_sql/data_sqlhave noWHEREclause and the params only includepage_sizeandoffset. - Add a case with multiple
platformsand/ormessage_typesentries to verify distinct parameter names (e.g.:platform_0,:platform_1) are generated and correctly referenced in the SQL.
This will better protect against regressions in the query-building logic.
Suggested implementation:
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query="x' OR 1=1 --",
exclude_ids=["admin"],
exclude_platforms=["slack"],
)
# Ensure two queries (count + data) are executed with named parameters
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
# Basic contract: queries use named parameters and do not interpolate the search_query
assert ":" in count_sql
assert ":" in data_sql
# Dangerous search_query should not appear directly in the SQL but only in bound params
assert "x' OR 1=1 --" not in count_sql
assert "x' OR 1=1 --" not in data_sql
assert any(v == "x' OR 1=1 --" for v in count_params.values())
assert any(v == "x' OR 1=1 --" for v in data_params.values())
def test_get_filtered_conversations_without_filters_has_no_where_clause(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=1,
page_size=25,
platforms=None,
message_types=None,
search_query=None,
exclude_ids=None,
exclude_platforms=None,
)
# We should still execute the standard count + data queries
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
normalized_count_sql = " ".join(count_sql.split()).upper()
normalized_data_sql = " ".join(data_sql.split()).upper()
# No WHERE clause should be present when no filters are provided
assert " WHERE " not in normalized_count_sql
assert " WHERE " not in normalized_data_sql
# Params for the data query should only include pagination
assert "page_size" in data_params
assert "offset" in data_params
assert len(data_params) == 2
# Count query may or may not use page_size; it must not include filter params
forbidden_prefixes = ("platform_", "message_type_", "exclude_id_", "exclude_platform_", "search_")
assert not any(k.startswith(forbidden_prefixes) for k in count_params.keys())
def test_get_filtered_conversations_with_multiple_list_entries_uses_distinct_params(self):
db, cursor = self._create_db_with_mock_cursor()
db.get_filtered_conversations(
page=1,
page_size=10,
platforms=["qq", "slack"],
message_types=["group", "private"],
search_query=None,
exclude_ids=None,
exclude_platforms=None,
)
assert cursor.execute.call_count == 2
count_sql, count_params = cursor.execute.call_args_list[0].args
data_sql, data_params = cursor.execute.call_args_list[1].args
# Ensure multiple distinct parameter names are generated for list filters
platform_param_keys = {k for k in data_params.keys() if k.startswith("platform_")}
message_type_param_keys = {k for k in data_params.keys() if k.startswith("message_type_")}
# At least two distinct params for each list
assert len(platform_param_keys) >= 2
assert len(message_type_param_keys) >= 2
# All generated parameter names should appear in the SQL
for key in platform_param_keys | message_type_param_keys:
assert f":{key}" in data_sql
# And none of them are accidentally reused in a conflicting way
assert platform_param_keys.isdisjoint(message_type_param_keys)These tests assume:
get_filtered_conversationsexecutes exactly two queries viacursor.execute: a count query first, then a data query.- The only non-filter parameters are
page_sizeandoffset. - Parameter names for list filters are prefixed as
platform_andmessage_type_.
If your implementation differs, adjust:
- The argument list in the
get_filtered_conversationscalls (add/remove keyword arguments as needed). - The assertions on
count_params/data_paramskeys and the param-name prefixes (platform_,message_type_) to match the actual naming scheme used by your query builder. - The expectations on
cursor.execute.call_countand call order if the method issues queries differently.
| content = template_path.read_text(encoding="utf-8") | ||
| assert "text | safe" not in content | ||
| assert "text | tojson" in content |
There was a problem hiding this comment.
suggestion (testing): 让模板安全性断言更具体地针对文本占位符,以避免误报
因为此检查只搜索子串 "text | tojson",如果该文本出现在注释或非用户可控的值上,也有可能通过测试。为了更好地体现测试意图,可以改为匹配特定的模板占位符(例如通过正则匹配 {{\s*text\s*\|\s*tojson\s*}}),从而确保测试验证的是用户提供的 text 在每个模板中都经过安全序列化。
建议实现:
import re
def test_t2i_templates_use_json_serialization_for_text(): content = template_path.read_text(encoding="utf-8")
assert "text | safe" not in content
assert re.search(r"{{\s*text\s*\|\s*tojson\s*}}", content) is not NoneOriginal comment in English
suggestion (testing): Make the template safety assertion more specific to the text placeholder to avoid false positives
Because this check only looks for the substring "text | tojson", it could pass if that text appears in a comment or for a non-user-controlled value. To better enforce the intent, match the specific template placeholder instead (e.g., via regex for {{\s*text\s*\|\s*tojson\s*}}) so the test confirms the user-supplied text is safely serialized in each template.
Suggested implementation:
import re
def test_t2i_templates_use_json_serialization_for_text(): content = template_path.read_text(encoding="utf-8")
assert "text | safe" not in content
assert re.search(r"{{\s*text\s*\|\s*tojson\s*}}", content) is not None| result = subprocess.run( | ||
| [os.environ.get("PYTHON", sys.executable), "-c", code], | ||
| [sys.executable, "-c", code], | ||
| timeout=timeout, | ||
| capture_output=True, | ||
| text=True, | ||
| shell=False, | ||
| ) |
There was a problem hiding this comment.
security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对 subprocess 函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()。
Source: opengrep
Original comment in English
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep
|
|
||
| if parsed.scheme not in allowed_schemes: | ||
| raise ValueError( | ||
| f"{label} must use http://, https://, ws:// or wss://: {normalized_url}", |
There was a problem hiding this comment.
security (javascript.lang.security.detect-insecure-websocket): 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。
Source: opengrep
Original comment in English
security (javascript.lang.security.detect-insecure-websocket): Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
Source: opengrep
| ws_scheme = scheme_map[parsed.scheme] | ||
| except KeyError as exc: | ||
| raise ValueError( | ||
| f"{label} must use http://, https://, ws:// or wss://: {normalized_url}", |
There was a problem hiding this comment.
security (javascript.lang.security.detect-insecure-websocket): 检测到不安全的 WebSocket。所有 WebSocket 连接都应使用安全 WebSocket(wss)。
Source: opengrep
Original comment in English
security (javascript.lang.security.detect-insecure-websocket): Insecure WebSocket Detected. WebSocket Secure (wss) should be used for all WebSocket connections.
Source: opengrep
| print(f"Uploading to: {target}") | ||
|
|
||
| subprocess.run(cmd, check=True) | ||
| subprocess.run(cmd, check=True, shell=False) |
There was a problem hiding this comment.
security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对 subprocess 函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用方式,确保它不能被外部资源控制。你可以考虑使用 shlex.escape()。
Source: opengrep
Original comment in English
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - 我发现了 2 个安全问题、2 个其他问题,并留下了一些整体反馈:
安全问题:
- 检测到在使用子进程函数
run时没有使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用,确保其不能被外部资源控制。你可以考虑使用shlex.escape()。(链接) - 检测到在使用子进程函数
run时没有使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计此调用的使用,确保其不能被外部资源控制。你可以考虑使用shlex.escape()。(链接)
总体评论:
- 在
SQLiteDatabase.get_filtered_conversations中,当前data_sql的字符串拼接会生成FROM webchat_conversationWHERE ...(在WHERE之前缺少一个空格),这会在运行时导致查询失败;请在FROM webchat_conversation与插值的where_sql之间显式添加一个空格或换行。
提供给 AI Agents 的提示词
Please address the comments from this code review:
## Overall Comments
- In `SQLiteDatabase.get_filtered_conversations`, the `data_sql` string concatenation currently produces `FROM webchat_conversationWHERE ...` (missing a space before `WHERE`), which will break the query at runtime; add an explicit space or newline between `FROM webchat_conversation` and the interpolated `where_sql`.
## Individual Comments
### Comment 1
<location path=".github/workflows/build-docs.yml" line_range="14" />
<code_context>
steps:
- name: checkout
- uses: actions/checkout@master
+ uses: actions/checkout@v6
- name: nodejs installation
uses: actions/setup-node@v6
</code_context>
<issue_to_address>
**issue (bug_risk):** The `actions/checkout@v6` tag does not currently exist and will cause workflow failures.
`actions/checkout` is only available up to `v4`. Please update this to `actions/checkout@v4` (or a specific `v4.x.y`), consistent with how the other actions are pinned, to avoid runtime resolution errors.
</issue_to_address>
### Comment 2
<location path="tests/unit/test_sqlite_v3.py" line_range="44-53" />
<code_context>
+ def test_get_filtered_conversations_uses_named_parameters(self):
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Strengthen assertions to ensure raw search input never appears in the data query SQL
This already confirms the unsafe input only appears in the bound parameters for the count query. Please add an equivalent assertion for `data_sql` (e.g. `assert "x' OR 1=1 --" not in data_sql`) so both queries are covered for SQL injection hardening.
Suggested implementation:
```python
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()
unsafe_search = "x' OR 1=1 --"
db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query=unsafe_search,
exclude_ids=["admin"],
exclude_platforms=["slack"],
)
# Expect two queries: a count(*) query and a data query
assert cursor.execute.call_count == 2
(count_sql, count_params), (data_sql, data_params) = [
call.args for call in cursor.execute.call_args_list
]
# Unsafe input must only appear in bound parameters for both queries,
# never interpolated into the raw SQL strings.
assert unsafe_search in count_params.values()
assert unsafe_search not in count_sql
assert unsafe_search in data_params.values()
assert unsafe_search not in data_sql
```
If the existing test already unpacks `cursor.execute.call_args_list` or asserts on `count_sql`/`count_params`, you may want to merge those existing assertions into this rewritten body instead of replacing them entirely. The key requirement is that both queries (count and data) assert that the raw SQL (`*_sql`) does not contain the unsafe search string, while the parameters (`*_params`) do contain it.
</issue_to_address>
### Comment 3
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 4
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据这些反馈改进以后的 Review。
Original comment in English
Hey - I've found 2 security issues, 2 other issues, and left some high level feedback:
Security issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
General comments:
- In
SQLiteDatabase.get_filtered_conversations, thedata_sqlstring concatenation currently producesFROM webchat_conversationWHERE ...(missing a space beforeWHERE), which will break the query at runtime; add an explicit space or newline betweenFROM webchat_conversationand the interpolatedwhere_sql.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `SQLiteDatabase.get_filtered_conversations`, the `data_sql` string concatenation currently produces `FROM webchat_conversationWHERE ...` (missing a space before `WHERE`), which will break the query at runtime; add an explicit space or newline between `FROM webchat_conversation` and the interpolated `where_sql`.
## Individual Comments
### Comment 1
<location path=".github/workflows/build-docs.yml" line_range="14" />
<code_context>
steps:
- name: checkout
- uses: actions/checkout@master
+ uses: actions/checkout@v6
- name: nodejs installation
uses: actions/setup-node@v6
</code_context>
<issue_to_address>
**issue (bug_risk):** The `actions/checkout@v6` tag does not currently exist and will cause workflow failures.
`actions/checkout` is only available up to `v4`. Please update this to `actions/checkout@v4` (or a specific `v4.x.y`), consistent with how the other actions are pinned, to avoid runtime resolution errors.
</issue_to_address>
### Comment 2
<location path="tests/unit/test_sqlite_v3.py" line_range="44-53" />
<code_context>
+ def test_get_filtered_conversations_uses_named_parameters(self):
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Strengthen assertions to ensure raw search input never appears in the data query SQL
This already confirms the unsafe input only appears in the bound parameters for the count query. Please add an equivalent assertion for `data_sql` (e.g. `assert "x' OR 1=1 --" not in data_sql`) so both queries are covered for SQL injection hardening.
Suggested implementation:
```python
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()
unsafe_search = "x' OR 1=1 --"
db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query=unsafe_search,
exclude_ids=["admin"],
exclude_platforms=["slack"],
)
# Expect two queries: a count(*) query and a data query
assert cursor.execute.call_count == 2
(count_sql, count_params), (data_sql, data_params) = [
call.args for call in cursor.execute.call_args_list
]
# Unsafe input must only appear in bound parameters for both queries,
# never interpolated into the raw SQL strings.
assert unsafe_search in count_params.values()
assert unsafe_search not in count_sql
assert unsafe_search in data_params.values()
assert unsafe_search not in data_sql
```
If the existing test already unpacks `cursor.execute.call_args_list` or asserts on `count_sql`/`count_params`, you may want to merge those existing assertions into this rewritten body instead of replacing them entirely. The key requirement is that both queries (count and data) assert that the raw SQL (`*_sql`) does not contain the unsafe search string, while the parameters (`*_params`) do contain it.
</issue_to_address>
### Comment 3
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 4
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| steps: | ||
| - name: checkout | ||
| uses: actions/checkout@master | ||
| uses: actions/checkout@v6 |
There was a problem hiding this comment.
issue (bug_risk): actions/checkout@v6 这个标签目前并不存在,会导致工作流失败。
actions/checkout 目前最高只到 v4。请将其更新为 actions/checkout@v4(或具体的 v4.x.y 版本),并与其他已固定版本的 actions 保持一致,以避免运行时解析错误。
Original comment in English
issue (bug_risk): The actions/checkout@v6 tag does not currently exist and will cause workflow failures.
actions/checkout is only available up to v4. Please update this to actions/checkout@v4 (or a specific v4.x.y), consistent with how the other actions are pinned, to avoid runtime resolution errors.
| def test_get_filtered_conversations_uses_named_parameters(self): | ||
| db, cursor = self._create_db_with_mock_cursor() | ||
|
|
||
| db.get_filtered_conversations( | ||
| page=2, | ||
| page_size=10, | ||
| platforms=["qq"], | ||
| message_types=["group"], | ||
| search_query="x' OR 1=1 --", | ||
| exclude_ids=["admin"], |
There was a problem hiding this comment.
🚨 suggestion (security): 加强断言,确保原始搜索输入永远不会出现在数据查询的 SQL 中。
目前已经确认了不安全输入只会出现在计数查询(count query)的绑定参数中。请为 data_sql 添加等效的断言(例如 assert "x' OR 1=1 --" not in data_sql),从而让这两个查询都覆盖到 SQL 注入加固场景。
建议实现:
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()
unsafe_search = "x' OR 1=1 --"
db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query=unsafe_search,
exclude_ids=["admin"],
exclude_platforms=["slack"],
)
# Expect two queries: a count(*) query and a data query
assert cursor.execute.call_count == 2
(count_sql, count_params), (data_sql, data_params) = [
call.args for call in cursor.execute.call_args_list
]
# Unsafe input must only appear in bound parameters for both queries,
# never interpolated into the raw SQL strings.
assert unsafe_search in count_params.values()
assert unsafe_search not in count_sql
assert unsafe_search in data_params.values()
assert unsafe_search not in data_sql如果现有测试已经解包了 cursor.execute.call_args_list,或者已经对 count_sql / count_params 做了断言,你可以考虑把这些已有断言合并进上述重写后的测试体,而不是完全替换掉。关键要求是:对这两个查询(count 和 data),都要断言原始 SQL(*_sql)不包含不安全的搜索字符串,同时参数(*_params)中必须包含该字符串。
Original comment in English
🚨 suggestion (security): Strengthen assertions to ensure raw search input never appears in the data query SQL
This already confirms the unsafe input only appears in the bound parameters for the count query. Please add an equivalent assertion for data_sql (e.g. assert "x' OR 1=1 --" not in data_sql) so both queries are covered for SQL injection hardening.
Suggested implementation:
def test_get_filtered_conversations_uses_named_parameters(self):
db, cursor = self._create_db_with_mock_cursor()
unsafe_search = "x' OR 1=1 --"
db.get_filtered_conversations(
page=2,
page_size=10,
platforms=["qq"],
message_types=["group"],
search_query=unsafe_search,
exclude_ids=["admin"],
exclude_platforms=["slack"],
)
# Expect two queries: a count(*) query and a data query
assert cursor.execute.call_count == 2
(count_sql, count_params), (data_sql, data_params) = [
call.args for call in cursor.execute.call_args_list
]
# Unsafe input must only appear in bound parameters for both queries,
# never interpolated into the raw SQL strings.
assert unsafe_search in count_params.values()
assert unsafe_search not in count_sql
assert unsafe_search in data_params.values()
assert unsafe_search not in data_sqlIf the existing test already unpacks cursor.execute.call_args_list or asserts on count_sql/count_params, you may want to merge those existing assertions into this rewritten body instead of replacing them entirely. The key requirement is that both queries (count and data) assert that the raw SQL (*_sql) does not contain the unsafe search string, while the parameters (*_params) do contain it.
| result = subprocess.run( | ||
| [os.environ.get("PYTHON", sys.executable), "-c", code], | ||
| [sys.executable, "-c", code], | ||
| timeout=timeout, | ||
| capture_output=True, | ||
| text=True, | ||
| shell=False, | ||
| ) |
There was a problem hiding this comment.
security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计该调用的使用,确保其不能被外部资源控制。你可以考虑使用 shlex.escape()。
来源:opengrep
Original comment in English
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep
| subprocess.run(cmd, check=True) | ||
| # nosemgrep: python.lang.security.audit.dangerous-subprocess-use-audit | ||
| # Uses an argv list with shell=False after checking that rclone exists in PATH. | ||
| subprocess.run(cmd, check=True, shell=False) |
There was a problem hiding this comment.
security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,则可能存在命令注入风险。请审计该调用的使用,确保其不能被外部资源控制。你可以考虑使用 shlex.escape()。
来源:opengrep
Original comment in English
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - 我发现了 2 个安全问题
安全问题:
- 检测到在使用子进程函数
run时没有使用静态字符串。如果这些数据可以被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用方式,以确保它不能被外部资源控制。你可以考虑使用shlex.escape()。(链接) - 检测到在使用子进程函数
run时没有使用静态字符串。如果这些数据可以被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用方式,以确保它不能被外部资源控制。你可以考虑使用shlex.escape()。(链接)
供 AI 代理使用的提示词
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 2
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>帮我变得更有用!请对每条评论点击 👍 或 👎,我会根据你的反馈改进后续的评审。
Original comment in English
Hey - I've found 2 security issues
Security issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 2
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - 我发现了 2 个安全问题和 1 个其他问题。
安全问题:
- 检测到对子进程函数
run的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用shlex.escape()。(link) - 检测到对子进程函数
run的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用shlex.escape()。(link)
面向 AI Agent 的提示词
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path=".github/workflows/build-docs.yml" line_range="14-16" />
<code_context>
steps:
- name: checkout
- uses: actions/checkout@master
+ uses: actions/checkout@v6
- name: nodejs installation
uses: actions/setup-node@v6
with:
</code_context>
<issue_to_address>
**issue (bug_risk):** 固定的 GitHub Actions 版本 `actions/checkout@v6` 和 `actions/setup-node@v6` 很可能尚不存在,会导致工作流中断。
目前最新发布的主版本是 `actions/checkout@v4` 和 `actions/setup-node@v4`。使用 `@v6` 将会导致工作流在运行时失败。请固定到已存在的版本(例如 `@v4` 或具体的 `@v4.x.y`),以在保持版本显式可控的同时确保工作流可正常运行。
</issue_to_address>
### Comment 2
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对子进程函数 `run` 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 `shlex.escape()`。
*来源:opengrep*
</issue_to_address>
### Comment 3
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** 检测到对子进程函数 `run` 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 `shlex.escape()`。
*来源:opengrep*
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English
Hey - I've found 2 security issues, and 1 other issue
Security issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path=".github/workflows/build-docs.yml" line_range="14-16" />
<code_context>
steps:
- name: checkout
- uses: actions/checkout@master
+ uses: actions/checkout@v6
- name: nodejs installation
uses: actions/setup-node@v6
with:
</code_context>
<issue_to_address>
**issue (bug_risk):** Pinned GitHub Actions versions `actions/checkout@v6` and `actions/setup-node@v6` likely do not exist yet and will break the workflow.
The latest published majors are currently `actions/checkout@v4` and `actions/setup-node@v4`. Using `@v6` will cause the workflow to fail at runtime. Please pin to existing versions (e.g. `@v4` or a specific `@v4.x.y`) to keep the workflow functional while remaining explicit about versions.
</issue_to_address>
### Comment 2
<location path="astrbot/core/computer/booters/local.py" line_range="116-122" />
<code_context>
result = subprocess.run(
[sys.executable, "-c", code],
timeout=timeout,
capture_output=True,
text=True,
shell=False,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 3
<location path="docs/scripts/upload_doc_images_to_r2.py" line_range="225" />
<code_context>
subprocess.run(cmd, check=True, shell=False)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| result = subprocess.run( | ||
| [os.environ.get("PYTHON", sys.executable), "-c", code], | ||
| [sys.executable, "-c", code], | ||
| timeout=timeout, | ||
| capture_output=True, | ||
| text=True, | ||
| shell=False, | ||
| ) |
There was a problem hiding this comment.
security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 shlex.escape()。
来源:opengrep
Original comment in English
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep
| subprocess.run(cmd, check=True) | ||
| # nosemgrep: python.lang.security.audit.dangerous-subprocess-use-audit | ||
| # Uses an argv list with shell=False after checking that rclone exists in PATH. | ||
| subprocess.run(cmd, check=True, shell=False) |
There was a problem hiding this comment.
security (python.lang.security.audit.dangerous-subprocess-use-audit): 检测到对子进程函数 run 的调用未使用静态字符串。如果这些数据可能被恶意行为者控制,这可能构成命令注入风险。请审计此调用的使用,确保它不会被外部资源控制。你可以考虑使用 shlex.escape()。
来源:opengrep
Original comment in English
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep
Summary
This PR hardens the code paths that were confirmed as real security issues in the current scanner/review pass, while deliberately leaving false positives and compatibility-sensitive items unchanged.
What changed
Python backend
astrbot/core/db/migration/sqlite_v3.pyand added regression coverage to ensure unsafe search text is only passed via bound parameters.sys.executable) withshell=Falseinastrbot/core/computer/booters/local.py, plus an explicit audit note and tests.shell=Falseindocs/scripts/upload_doc_images_to_r2.py, plus an explicit audit note and tests.WebSocket transport hardening
astrbot/core/platform/sources/websocket_security.pyas a shared transport validator/converter.prod/apiare no longer treated as local.to_websocket_url()so unsupported schemes fail with a clearValueErrorinstead of relying on a mapping lookup.StreamingClient.T2I templates and frontend rendering
tojsonin the T2I HTML templates.astrbot/core/utils/t2i/network_strategy.pybecause JSON serialization now handles that safely.v-htmlusage in the dashboard/docs UI:IPythonToolBlock.vueAstrBotConfigV4.vueLogo.vuev-htmlusage indocs/.vitepress/theme/components/ArticleShare.vueDashboard dependency remediation
vite,axios,lodash,markdown-it,dompurify, and related transitive packages.CI / workflow hygiene
@masterwith explicit released version tags across the touched GitHub Actions workflows.Kubernetes runtime hardening
seccompProfile: RuntimeDefault,allowPrivilegeEscalation: false, andcapabilities.drop: [ALL]to the affected Kubernetes workloads.Tests
tests/unit/test_computer.pytests/unit/test_sqlite_v3.pytests/unit/test_t2i_security.pytests/unit/test_websocket_security.pydocs/tests/test_upload_doc_images_to_r2.pyWhy these changes
The goal of this PR is to fix the scanner findings that correspond to real exploit paths without widening the scope into unrelated product or deployment policy changes.
In particular:
False positives / intentional scope decisions
These items came up during scanning/review but are intentionally not changed in this PR:
ws://examples: the temporary doc rewrite was reverted. Those examples document local or same-network setups and are not the runtime security boundary. The runtime code changes for Misskey/Satori remain in place.subprocess.runfindings inlocal.pyandupload_doc_images_to_r2.py: these calls are audited argv-list invocations withshell=False, not shell-string command construction. This PR keeps the safe calling pattern and records the audit intent inline.latesttags in Kubernetes: this PR does not change image mutability policy. The branch keeps the currentlatesttags by request.Test Plan
uv run pytest tests/unit/test_computer.py tests/unit/test_sqlite_v3.py tests/unit/test_t2i_security.py tests/unit/test_websocket_security.py docs/tests/test_upload_doc_images_to_r2.pyuv run ruff check astrbot/core/computer/booters/local.py astrbot/core/db/migration/sqlite_v3.py astrbot/core/platform/sources/websocket_security.py astrbot/core/platform/sources/misskey/misskey_api.py astrbot/core/platform/sources/satori/satori_adapter.py docs/scripts/upload_doc_images_to_r2.py tests/unit/test_sqlite_v3.py tests/unit/test_t2i_security.py tests/unit/test_websocket_security.pyuv run ruff format --check .pnpm build(indashboard)pnpm docs:build(indocs)pnpm auditchecks for the remediated dashboard advisoriesNot run
docker build -f Dockerfile .(local Docker daemon unavailable in this environment)由 Sourcery 提供的摘要
在加强运行时、传输层和前端安全性的同时,进一步收紧 CI、Kubernetes 与依赖项配置。
Bug 修复:
增强:
构建:
CI:
master或主版本等可变分支标签。测试:
Original summary in English
Summary by Sourcery
在强化运行时、传输层和前端安全性的同时,收紧 CI、Kubernetes 和仪表盘依赖配置。
Bug 修复:
增强:
构建:
CI:
测试:
Original summary in English
Summary by Sourcery
Harden runtime, transport, and frontend security while tightening CI, Kubernetes, and dashboard dependency configurations.
Bug Fixes:
Enhancements:
Build:
CI:
Tests: