From bc648ee72295d6753116adec93228825c6b41495 Mon Sep 17 00:00:00 2001 From: liangzexin96 Date: Fri, 5 Jun 2026 09:53:21 +0800 Subject: [PATCH 1/4] feat(qqapp): support media in/out for QQ bot Inbound: parse data.attachments (image/voice/file) instead of text-only, so the agent can read images, files and voice URLs sent by users. Outbound: send_done emits files marked with [FILE:path] as rich media via the two-step QQ API (post_c2c_file/post_group_file -> post_*_message msg_type=7). Public URL for QQ's reverse-fetch is provided by a new self-contained module frontends/puburl.py using a cloudflared quick tunnel (auto-downloads the binary on demand, grabs the tunnel URL at runtime, warms up the edge to avoid first-request SSL EOF). No account, fixed domain or manual config needed; reproducible on any machine with outbound network. Reuses existing extract_files/strip_files in chatapp_common; no new deps (aiohttp already declared). --- frontends/puburl.py | 244 ++++++++++++++++++++++++++++++++++++++++++++ frontends/qqapp.py | 137 ++++++++++++++++++++++++- 2 files changed, 376 insertions(+), 5 deletions(-) create mode 100644 frontends/puburl.py diff --git a/frontends/puburl.py b/frontends/puburl.py new file mode 100644 index 000000000..6056c3f9f --- /dev/null +++ b/frontends/puburl.py @@ -0,0 +1,244 @@ +""" +puburl.py — 把本地文件临时暴露成公网 HTTPS URL,供 QQ 富媒体出站使用。 + +QQ 出站富媒体(图片/视频/语音/文件)不接受字节直传,必须给腾讯一个公网 URL +让它反向拉取。本模块在【任意机器】上自包含地解决这个依赖: + + 本地文件 -> 复制到内部 serve 目录 -> 内置 HTTP 文件服务(127.0.0.1:随机端口) + -> cloudflared quick tunnel 出站建连 -> 公网 https://xxx.trycloudflare.com// + +设计目标(换任何电脑部署 GA、重启即复现): + 1. 隧道 URL 每次重启都变 —— 运行期从 cloudflared 输出实时抓取,绝不硬编码。 + 2. cloudflared 缺失 —— 按当前 OS/架构自动从官方 GitHub 下载到 .portable/tools/。 + 3. 纯标准库实现 HTTP 服务,无额外依赖。 + +安全:文件放在以 uuid4 token 命名的子目录下,URL 不可枚举;隧道地址本身随机; +仅在隧道存活期间可达,并自动清理超过 TTL 的旧文件。 +""" + +import atexit +import functools +import http.server +import os +import platform +import re +import shutil +import socket +import stat +import subprocess +import threading +import time +import urllib.request +import uuid + +ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +TOOLS_DIR = os.path.join(ROOT, ".portable", "tools") +SERVE_DIR = os.path.join(ROOT, "temp", "_pubserve") +FILE_TTL = 3600 # 已发布文件保留秒数,超过则清理 + +_TUNNEL_RE = re.compile(r"https://[-a-z0-9]+\.trycloudflare\.com", re.I) + + +def _log(msg): + print(f"[puburl] {msg}", flush=True) + + +class _QuietHandler(http.server.SimpleHTTPRequestHandler): + """静默版文件 handler(不污染 qqapp.log)。""" + + def log_message(self, *args): + pass + + +def _cf_asset(): + """返回 (github资产名, 本地二进制文件名),按当前系统/架构。""" + sysname = platform.system().lower() + machine = platform.machine().lower() + if machine in ("aarch64", "arm64"): + arch = "arm64" + elif machine in ("x86_64", "amd64", "x64"): + arch = "amd64" + elif "386" in machine or "i686" in machine or "i386" in machine: + arch = "386" + elif machine.startswith("arm"): + arch = "arm" + else: + arch = "amd64" + if sysname == "windows": + return f"cloudflared-windows-{arch}.exe", "cloudflared.exe" + if sysname == "linux": + return f"cloudflared-linux-{arch}", "cloudflared" + if sysname == "darwin": + # macOS 官方只发布 .tgz + return f"cloudflared-darwin-{arch}.tgz", "cloudflared" + return f"cloudflared-linux-{arch}", "cloudflared" + + +class PublicFileServer: + def __init__(self): + self._lock = threading.Lock() + self._httpd = None + self._http_port = None + self._cf_proc = None + self._tunnel_url = None + + # ---- cloudflared 二进制 ---- + def _ensure_cloudflared(self): + asset, binname = _cf_asset() + os.makedirs(TOOLS_DIR, exist_ok=True) + binpath = os.path.join(TOOLS_DIR, binname) + if os.path.exists(binpath) and os.path.getsize(binpath) > 0: + return binpath + url = f"https://github.com/cloudflare/cloudflared/releases/latest/download/{asset}" + _log(f"cloudflared 未找到,开始下载: {asset}") + tmp = binpath + ".part" + req = urllib.request.Request(url, headers={"User-Agent": "GA-puburl"}) + with urllib.request.urlopen(req, timeout=120) as resp, open(tmp, "wb") as f: + shutil.copyfileobj(resp, f) + if asset.endswith(".tgz"): + import tarfile + with tarfile.open(tmp) as tar: + member = next((m for m in tar.getmembers() if m.name.endswith("cloudflared")), None) + if not member: + raise RuntimeError("tgz 内未找到 cloudflared") + with tar.extractfile(member) as src, open(binpath, "wb") as dst: + shutil.copyfileobj(src, dst) + os.remove(tmp) + else: + os.replace(tmp, binpath) + if platform.system().lower() != "windows": + os.chmod(binpath, os.stat(binpath).st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH) + _log(f"cloudflared 已就绪: {binpath} ({os.path.getsize(binpath)} bytes)") + return binpath + + # ---- 本地 HTTP 文件服务 ---- + def _start_http(self): + os.makedirs(SERVE_DIR, exist_ok=True) + handler = functools.partial(_QuietHandler, directory=SERVE_DIR) + httpd = http.server.ThreadingHTTPServer(("127.0.0.1", 0), handler) + self._http_port = httpd.server_address[1] + self._httpd = httpd + threading.Thread(target=httpd.serve_forever, daemon=True).start() + _log(f"本地文件服务启动于 127.0.0.1:{self._http_port}") + + # ---- cloudflared 隧道 ---- + def _start_tunnel(self, binpath): + cmd = [ + binpath, "tunnel", + "--no-autoupdate", + "--url", f"http://127.0.0.1:{self._http_port}", + ] + self._cf_proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + text=True, encoding="utf-8", errors="replace", bufsize=1, + ) + found = threading.Event() + + def _reader(): + for line in self._cf_proc.stdout: + if self._tunnel_url is None: + m = _TUNNEL_RE.search(line) + if m: + self._tunnel_url = m.group(0) + _log(f"隧道已建立: {self._tunnel_url}") + found.set() + threading.Thread(target=_reader, daemon=True).start() + if not found.wait(timeout=45): + raise RuntimeError("等待 cloudflared 隧道 URL 超时") + self._warmup(self._tunnel_url) + + def _warmup(self, base_url): + """隧道刚建立时边缘节点可能尚未就绪,首次请求会 SSL EOF。 + 这里自探到拿回任意 HTTP 响应为止,避免腾讯首次反向拉取失败。""" + import ssl + ctx = ssl.create_default_context() + for i in range(10): + try: + req = urllib.request.Request(base_url, headers={"User-Agent": "ga-warmup"}) + urllib.request.urlopen(req, timeout=15, context=ctx) + _log(f"隧道边缘就绪 (warmup#{i})") + return True + except urllib.error.HTTPError: + # 有 HTTP 响应(如 404)即说明边缘已就绪 + _log(f"隧道边缘就绪 (warmup#{i}, http)") + return True + except Exception: + time.sleep(3) + _log("隧道预热未确认就绪,继续(腾讯侧可能首拉失败)") + return False + + def ensure_started(self): + with self._lock: + if self._tunnel_url and self._cf_proc and self._cf_proc.poll() is None: + return self._tunnel_url + # 隧道挂了则重置重建 + if self._cf_proc and self._cf_proc.poll() is not None: + _log("检测到 cloudflared 已退出,重建隧道") + self._tunnel_url = None + if self._httpd is None: + self._start_http() + binpath = self._ensure_cloudflared() + self._start_tunnel(binpath) + return self._tunnel_url + + # ---- 清理过期文件 ---- + def _cleanup(self): + now = time.time() + try: + for name in os.listdir(SERVE_DIR): + p = os.path.join(SERVE_DIR, name) + try: + if now - os.path.getmtime(p) > FILE_TTL: + shutil.rmtree(p, ignore_errors=True) if os.path.isdir(p) else os.remove(p) + except OSError: + pass + except FileNotFoundError: + pass + + # ---- 对外接口 ---- + def publish(self, local_path): + """把本地文件复制到 serve 目录并返回公网 URL;失败返回 None。""" + if not os.path.isfile(local_path): + return None + url = self.ensure_started() + if not url: + return None + self._cleanup() + token = uuid.uuid4().hex + dest_dir = os.path.join(SERVE_DIR, token) + os.makedirs(dest_dir, exist_ok=True) + fname = os.path.basename(local_path) + shutil.copy2(local_path, os.path.join(dest_dir, fname)) + return f"{url}/{token}/{urllib.request.quote(fname)}" + + def shutdown(self): + if self._cf_proc and self._cf_proc.poll() is None: + self._cf_proc.terminate() + if self._httpd: + self._httpd.shutdown() + + +_INSTANCE = PublicFileServer() +atexit.register(_INSTANCE.shutdown) + + +def publish(local_path): + return _INSTANCE.publish(local_path) + + +def ensure_started(): + return _INSTANCE.ensure_started() + + +if __name__ == "__main__": + # 自测:发布一个临时文件并打印 URL + import sys + test_file = sys.argv[1] if len(sys.argv) > 1 else __file__ + print("publishing:", test_file) + print("URL:", publish(test_file)) + print("Ctrl-C to stop"); + try: + while True: + time.sleep(5) + except KeyboardInterrupt: + _INSTANCE.shutdown() diff --git a/frontends/qqapp.py b/frontends/qqapp.py index 2ad088f88..b2d982f40 100644 --- a/frontends/qqapp.py +++ b/frontends/qqapp.py @@ -1,10 +1,12 @@ -import asyncio, os, sys, threading, time +import asyncio, os, re, sys, threading, time, uuid from collections import deque sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +_TEMP_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'temp') from agentmain import GeneraticAgent -from chatapp_common import AgentChatMixin, ensure_single_instance, public_access, redirect_log, require_runtime, split_text +from chatapp_common import AgentChatMixin, ensure_single_instance, public_access, redirect_log, require_runtime, split_text, extract_files, strip_files, clean_reply from llmcore import mykeys +import puburl try: import botpy @@ -28,6 +30,101 @@ def _next_msg_seq(): return MSG_SEQ +# QQ 出站富媒体类型:1=图片 2=视频 3=语音 4=文件(按后缀判定) +_EXT_FILE_TYPE = { + ".jpg": 1, ".jpeg": 1, ".png": 1, ".gif": 1, ".bmp": 1, ".webp": 1, + ".mp4": 2, ".mov": 2, ".avi": 2, ".mkv": 2, + ".silk": 3, ".amr": 3, ".mp3": 3, ".wav": 3, ".m4a": 3, +} + + +def _qq_file_type(path): + return _EXT_FILE_TYPE.get(os.path.splitext(path)[1].lower(), 4) + + +# QQ 附件 content_type: 1=图片 2=视频 3=语音 4=文件;不同消息类型字段可能不全,按后缀/url 兜底 +def _guess_ext(att, kind): + fn = getattr(att, "filename", "") or "" + ext = os.path.splitext(fn)[1] + if ext: + return ext + url = (getattr(att, "url", "") or "").split("?")[0] + ext = os.path.splitext(url)[1] + if ext: + return ext + ct = getattr(att, "content_type", None) + name = str(ct).lower() if ct is not None else "" + for key, e in (("image", ".jpg"), ("voice", ".silk"), ("audio", ".silk"), ("video", ".mp4")): + if key in name: + return e + return {1: ".jpg", 2: ".mp4", 3: ".silk", 4: ".dat"}.get(ct, ".dat") + + +def _kind_label(att): + ct = getattr(att, "content_type", None) + name = str(ct).lower() if ct is not None else "" + if ct == 1 or "image" in name: + return "图片" + if ct == 2 or "video" in name: + return "视频" + if ct == 3 or "voice" in name or "audio" in name: + return "语音" + return "文件" + + +async def _download_attachments(data): + """下载消息附件到 temp/,返回 [(kind_label, 'temp/xxx'), ...]。失败的跳过。""" + atts = getattr(data, "attachments", None) or [] + if not atts: + return [] + import aiohttp + os.makedirs(_TEMP_DIR, exist_ok=True) + saved = [] + async with aiohttp.ClientSession() as sess: + for att in atts: + url = getattr(att, "url", "") or "" + if not url: + continue + if url.startswith("//"): + url = "https:" + url + elif url.startswith("http://"): + url = "https://" + url[len("http://"):] + kind = _kind_label(att) + ext = _guess_ext(att, kind) + fname = f"qq_{uuid.uuid4().hex[:12]}{ext}" + fpath = os.path.join(_TEMP_DIR, fname) + try: + async with sess.get(url, timeout=aiohttp.ClientTimeout(total=60)) as resp: + resp.raise_for_status() + body = await resp.read() + with open(fpath, "wb") as f: + f.write(body) + saved.append((kind, f"temp/{fname}")) + print(f"[QQ] downloaded {kind} -> temp/{fname} ({len(body)} bytes)") + except Exception as e: + print(f"[QQ] download failed {url}: {e}") + return saved + + +def _build_prompt(text, attachments): + """把文本与附件路径合并成给 agent 的 prompt。""" + if not attachments: + return text + lines = [] + for kind, path in attachments: + if kind == "语音": + lines.append(f"[TIPS] 收到{kind}文件 {path}(QQ 语音通常为 silk 编码,需先转码再处理)") + elif kind == "图片": + lines.append(f"[TIPS] 收到{kind} {path}(可用 vision 查看)") + else: + lines.append(f"[TIPS] 收到{kind} {path}") + head = "\n".join(lines) + if text: + return f"{head}\n{text}" + return f"{head}\n请查看后等待下一步指令。" + + + def _build_intents(): try: return botpy.Intents(public_messages=True, direct_message=True) @@ -81,6 +178,34 @@ async def send_text(self, chat_id, content, *, msg_id=None, is_group=False): except Exception: await api(**{key: chat_id, "msg_type": 0, "content": part, "msg_id": msg_id, "msg_seq": seq}) + async def send_done(self, chat_id, raw_text, *, msg_id=None, is_group=False): + # 先发清理后的文本(去掉 [FILE:] 标记),再把文件作为富媒体发出 + files = [p for p in extract_files(raw_text) if os.path.exists(p)] + body = strip_files(clean_reply(raw_text)) + if body and body != "...": + await self.send_text(chat_id, body, msg_id=msg_id, is_group=is_group) + for path in files: + await self._send_file(chat_id, path, msg_id=msg_id, is_group=is_group) + + async def _send_file(self, chat_id, path, *, msg_id=None, is_group=False): + """QQ 富媒体出站:本地文件 -> 公网URL -> 腾讯反向拉取。失败则降级为文本提示。""" + name = os.path.basename(path) + file_type = _qq_file_type(path) + try: + url = await asyncio.to_thread(puburl.publish, path) + if not url: + raise RuntimeError("无法生成公网URL(隧道未就绪)") + upload = self.client.api.post_group_file if is_group else self.client.api.post_c2c_file + send = self.client.api.post_group_message if is_group else self.client.api.post_c2c_message + key = "group_openid" if is_group else "openid" + media = await upload(**{key: chat_id, "file_type": file_type, "url": url}) + await send(**{key: chat_id, "msg_type": 7, "media": media, + "msg_id": msg_id, "msg_seq": _next_msg_seq()}) + print(f"[QQ] send_file ok ({name}, type={file_type}) via {url}") + except Exception as e: + print(f"[QQ] send_file failed ({name}): {e}") + await self.send_text(chat_id, f"⚠️ 文件「{name}」发送失败:{e}", msg_id=msg_id, is_group=is_group) + async def on_message(self, data, is_group=False): try: msg_id = getattr(data, "id", None) @@ -88,7 +213,8 @@ async def on_message(self, data, is_group=False): return PROCESSED_IDS.append(msg_id) content = (getattr(data, "content", "") or "").strip() - if not content: + attachments = await _download_attachments(data) + if not content and not attachments: return author = getattr(data, "author", None) user_id = str(getattr(author, "member_openid" if is_group else "user_openid", "") or getattr(author, "id", "") or "unknown") @@ -96,10 +222,11 @@ async def on_message(self, data, is_group=False): if not public_access(ALLOWED) and user_id not in ALLOWED: print(f"[QQ] unauthorized user: {user_id}") return - print(f"[QQ] message from {user_id} ({'group' if is_group else 'c2c'}): {content}") + print(f"[QQ] message from {user_id} ({'group' if is_group else 'c2c'}): {content!r} +{len(attachments)} attach") if content.startswith("/"): return await self.handle_command(chat_id, content, msg_id=msg_id, is_group=is_group) - asyncio.create_task(self.run_agent(chat_id, content, msg_id=msg_id, is_group=is_group)) + prompt = _build_prompt(content, attachments) + asyncio.create_task(self.run_agent(chat_id, prompt, msg_id=msg_id, is_group=is_group)) except Exception: import traceback print("[QQ] handle_message error") From d74eb84cd3c7aa82160996311ea667397e283e24 Mon Sep 17 00:00:00 2001 From: liangzexin96 Date: Fri, 5 Jun 2026 19:19:05 +0800 Subject: [PATCH 2/4] =?UTF-8?q?feat(chat):=20=E9=80=90=20turn=20=E6=B5=81?= =?UTF-8?q?=E5=BC=8F=E5=BF=83=E8=B7=B3=E6=97=A5=E5=BF=97=E4=B8=8E=E7=BB=93?= =?UTF-8?q?=E6=9D=9F=E8=AF=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - run_agent 拆分为 classic/streaming 两种模式 - 经典模式用真实进度快照替换"还在处理中"占位 - 新增 format_turn_log/send_turn/send_done_files/format_done_message - 流式模式逐 turn 推送日志, 收尾不再汇总仅补发文件, 结束发结束语 --- frontends/chatapp_common.py | 125 +++++++++++++++++++++++++++++++++++- 1 file changed, 122 insertions(+), 3 deletions(-) diff --git a/frontends/chatapp_common.py b/frontends/chatapp_common.py index befaf1c8d..fc2dfecdf 100644 --- a/frontends/chatapp_common.py +++ b/frontends/chatapp_common.py @@ -35,6 +35,23 @@ def build_help_text(commands=HELP_COMMANDS): return "📖 命令列表:\n" + "\n".join(f"{cmd} - {desc}" for cmd, desc in commands) +def _extract_progress(text): + """从 agent 的流式文本中提取一句可读的进度快照。 + 优先取最新 ...,否则取最后一行有意义的文本。""" + if not text: + return "" + ms = re.findall(r"(.*?)", text, re.DOTALL) + if ms: + s = ms[-1].strip() + if s: + return s[:300] + for line in reversed(text.splitlines()): + ln = line.strip() + if ln and not ln.startswith("```") and not ln.startswith("<"): + return ln[:300] + return "" + + HELP_TEXT = build_help_text() FILE_HINT = "If you need to show files to user, use [FILE:filepath] in your response." TAG_PATS = [r"<" + t + r">.*?" for t in ("thinking", "summary", "tool_use", "file_content")] @@ -259,6 +276,10 @@ class AgentChatMixin: source = "chat" split_limit = 1500 ping_interval = 20 + # 流式逐 turn 心跳:开启后每个 turn 跑完即把该 turn 详情作为日志推送, + # 不再发"还在处理中"占位,也不在收尾时重复汇总全部 turn。默认关闭, + # 仅在能承受多条出站消息的前端(如 QQ)启用。 + stream_turns = False def __init__(self, agent, user_tasks): self.agent, self.user_tasks = agent, user_tasks @@ -269,6 +290,33 @@ async def send_text(self, chat_id, content, **ctx): async def send_done(self, chat_id, raw_text, **ctx): await self.send_text(chat_id, build_done_text(raw_text), **ctx) + @staticmethod + def format_turn_log(turn_no, text): + """把单个 turn 的累积文本整理成可读日志。 + 保留 (监控用),剔除 (冗长),并去掉重复的 + "LLM Running (Turn N) ..." 头,避免与外层 Turn 标记重复。""" + body = re.sub(r".*?", "", text or "", flags=re.DOTALL) + body = re.sub(r"^\s*\*{0,2}(?:LLM Running )?\(?Turn \d+\)? ?\.\.\.\*{0,2}\s*", "", body) + body = re.sub(r"\n{3,}", "\n\n", body).strip() + return f"📍 Turn {turn_no}\n{body or '(无文本输出)'}" + + async def send_turn(self, chat_id, turn_no, text, **ctx): + """逐 turn 心跳推送。容错:单条心跳发送失败不应中断整个任务。""" + try: + await self.send_text(chat_id, self.format_turn_log(turn_no, text), **ctx) + except Exception as e: + print(f"[{self.label}] send_turn {turn_no} failed: {e}") + + async def send_done_files(self, chat_id, raw_text, **ctx): + """流式模式收尾:只补发生成的文件,不再重复汇总全部 turn 文本。 + 基类无富媒体能力,默认空操作;有富媒体的前端(如 QQ)覆盖此方法。""" + return + + @staticmethod + def format_done_message(turn_count): + """对话完全结束后的结束语提示。""" + return f"✅ 本次任务已全部结束,共 {turn_count} 个 turn。可以下达新指令了。" + async def handle_command(self, chat_id, cmd, **ctx): parts = (cmd or "").split() op = (parts[0] if parts else "").lower() @@ -317,23 +365,42 @@ async def handle_command(self, chat_id, cmd, **ctx): return await self.send_text(chat_id, HELP_TEXT, **ctx) async def run_agent(self, chat_id, text, **ctx): + if self.stream_turns: + return await self._run_agent_streaming(chat_id, text, **ctx) + return await self._run_agent_classic(chat_id, text, **ctx) + + async def _run_agent_classic(self, chat_id, text, **ctx): state = {"running": True} self.user_tasks[chat_id] = state try: - await self.send_text(chat_id, "思考中...", **ctx) + await self.send_text(chat_id, "🤔 思考中...", **ctx) dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source) last_ping = time.time() + latest = "" # 最近一次的进度原文(当前 turn 文本) + sent_progress = "" # 上次实际发出的进度快照,去重用 while state["running"]: try: item = await asyncio.to_thread(dq.get, True, 3) except Q.Empty: + # 节流:每 ping_interval 发一次进度快照(沿用原有消息预算, + # 不额外增加被动回复条数,只是把"处理中"换成真实进度) if self.agent.is_running and time.time() - last_ping > self.ping_interval: - await self.send_text(chat_id, "⏳ 还在处理中,请稍等...", **ctx) + snap = _extract_progress(latest) + if snap and snap != sent_progress: + await self.send_text(chat_id, f"⚙️ {snap}", **ctx) + sent_progress = snap + else: + await self.send_text(chat_id, "⏳ 还在处理中,请稍等...", **ctx) last_ping = time.time() continue if "done" in item: - await self.send_done(chat_id, item.get("done", ""), **ctx) + # 被 /stop 打断时不再补发完整结果,交给下方"已停止"提示 + if state["running"]: + await self.send_done(chat_id, item.get("done", ""), **ctx) break + if "next" in item: + outs = item.get("outputs") or [] + latest = outs[-1] if outs else (item.get("next", "") or "") if not state["running"]: await self.send_text(chat_id, "⏹️ 已停止", **ctx) except Exception as e: @@ -344,6 +411,58 @@ async def run_agent(self, chat_id, text, **ctx): finally: self.user_tasks.pop(chat_id, None) + async def _run_agent_streaming(self, chat_id, text, **ctx): + """逐 turn 实时心跳模式:每个 turn 跑完即推送该 turn 详情,作为实时日志。 + 不发"还在处理中"占位;收尾时不再重复汇总全部 turn,只补发生成的文件。 + + turn 完成判定:put_task 产出的 'next' 项里 turn 号递增,意味着上一个 + turn 已经结束。此时 outputs[-2] 是刚结束 turn 的完整文本(agentmain 用 + turn_resps[-2:] 携带最近两个 turn)。done 项携带全部 turn 文本,用于补发 + 尚未推送的尾部 turn。""" + state = {"running": True} + self.user_tasks[chat_id] = state + sent_turn = 0 # 已推送的最大 turn 号 + try: + await self.send_text(chat_id, "🤔 开始处理,将逐 turn 推送日志...", **ctx) + dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source) + while state["running"]: + try: + item = await asyncio.to_thread(dq.get, True, 3) + except Q.Empty: + continue + if "next" in item: + cur = item.get("turn", 0) + outs = item.get("outputs") or [] + # turn 号已推进且恰好领先 1 → 上一个 turn (cur-1) 刚结束, + # 其完整文本就在 outs[-2](agentmain 用 turn_resps[-2:] 携带)。 + # 只在严格连续(cur-1 == sent_turn+1)时推送,避免重复; + # 万一出现跳跃,留给 done 阶段统一补发,保证不漏不重。 + if cur - 1 == sent_turn + 1 and len(outs) >= 2: + await self.send_turn(chat_id, cur - 1, outs[-2], **ctx) + sent_turn = cur - 1 + continue + if "done" in item: + if state["running"]: + all_outs = item.get("outputs") or [] + # 补发所有尚未推送的 turn(含最后一个 turn) + for t in range(sent_turn + 1, len(all_outs) + 1): + await self.send_turn(chat_id, t, all_outs[t - 1], **ctx) + sent_turn = len(all_outs) + # 只补发生成的文件,不重复汇总全部 turn 文本 + await self.send_done_files(chat_id, item.get("done", ""), **ctx) + # 对话完全结束后发结束语提示 + await self.send_text(chat_id, self.format_done_message(sent_turn), **ctx) + break + if not state["running"]: + await self.send_text(chat_id, "⏹️ 已停止", **ctx) + except Exception as e: + import traceback + print(f"[{self.label}] run_agent(stream) error: {e}") + traceback.print_exc() + await self.send_text(chat_id, f"❌ 错误: {e}", **ctx) + finally: + self.user_tasks.pop(chat_id, None) + from agentmain import GeneraticAgent as _GA from continue_cmd import handle_frontend_command as _handle_continue_frontend, install as _install_continue, reset_conversation as _reset_conversation From b395af27a62d498d9f03a652989bbec886ec9394 Mon Sep 17 00:00:00 2001 From: liangzexin96 Date: Fri, 5 Jun 2026 19:19:05 +0800 Subject: [PATCH 3/4] =?UTF-8?q?feat(qq):=20=E5=85=A5=E7=AB=99=E9=99=84?= =?UTF-8?q?=E4=BB=B6=E9=9A=94=E7=A6=BB=E7=9B=AE=E5=BD=95=E4=B8=8E=E5=A4=A7?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=8B=E8=BD=BD=E9=93=BE=E6=8E=A5=E9=99=8D?= =?UTF-8?q?=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 入站附件下载到 temp/qq_inbox 并按 TTL 自动清理 - 出站时排除入站附件, 避免回传用户自己发来的文件 - _send_file 富媒体被腾讯拒收时降级为公网下载链接(TTL 1h) - 启用 stream_turns; send_done_files 收尾仅补发生成的文件 --- frontends/qqapp.py | 73 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/frontends/qqapp.py b/frontends/qqapp.py index b2d982f40..3cffd9bb0 100644 --- a/frontends/qqapp.py +++ b/frontends/qqapp.py @@ -3,6 +3,8 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) _TEMP_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'temp') +_INBOX_DIR = os.path.join(_TEMP_DIR, 'qq_inbox') # 入站附件独立存放,便于清理 +_INBOX_TTL = 86400 # 入站附件保留秒数,超过自动清理 from agentmain import GeneraticAgent from chatapp_common import AgentChatMixin, ensure_single_instance, public_access, redirect_log, require_runtime, split_text, extract_files, strip_files, clean_reply from llmcore import mykeys @@ -72,13 +74,39 @@ def _kind_label(att): return "文件" +def _is_inbound(path): + """判断路径是否落在入站目录 temp/qq_inbox/ 内(兼容相对/绝对路径)。""" + try: + ap = os.path.abspath(path) + base = os.path.abspath(_INBOX_DIR) + return os.path.commonpath([ap, base]) == base + except (ValueError, OSError): + return False + + +def _cleanup_inbox(): + """清理 temp/qq_inbox/ 下超过 TTL 的旧附件。""" + now = time.time() + try: + for name in os.listdir(_INBOX_DIR): + p = os.path.join(_INBOX_DIR, name) + try: + if now - os.path.getmtime(p) > _INBOX_TTL: + os.remove(p) + except OSError: + pass + except FileNotFoundError: + pass + + async def _download_attachments(data): - """下载消息附件到 temp/,返回 [(kind_label, 'temp/xxx'), ...]。失败的跳过。""" + """下载消息附件到 temp/qq_inbox/,返回 [(kind_label, 'temp/qq_inbox/xxx'), ...]。失败的跳过。""" atts = getattr(data, "attachments", None) or [] if not atts: return [] import aiohttp - os.makedirs(_TEMP_DIR, exist_ok=True) + os.makedirs(_INBOX_DIR, exist_ok=True) + _cleanup_inbox() saved = [] async with aiohttp.ClientSession() as sess: for att in atts: @@ -92,15 +120,15 @@ async def _download_attachments(data): kind = _kind_label(att) ext = _guess_ext(att, kind) fname = f"qq_{uuid.uuid4().hex[:12]}{ext}" - fpath = os.path.join(_TEMP_DIR, fname) + fpath = os.path.join(_INBOX_DIR, fname) try: async with sess.get(url, timeout=aiohttp.ClientTimeout(total=60)) as resp: resp.raise_for_status() body = await resp.read() with open(fpath, "wb") as f: f.write(body) - saved.append((kind, f"temp/{fname}")) - print(f"[QQ] downloaded {kind} -> temp/{fname} ({len(body)} bytes)") + saved.append((kind, f"temp/qq_inbox/{fname}")) + print(f"[QQ] downloaded {kind} -> temp/qq_inbox/{fname} ({len(body)} bytes)") except Exception as e: print(f"[QQ] download failed {url}: {e}") return saved @@ -161,6 +189,7 @@ async def on_direct_message_create(self, message): class QQApp(AgentChatMixin): label, source, split_limit = "QQ", "qq", 1500 + stream_turns = True # 逐 turn 实时心跳:每个 turn 跑完即推送该 turn 日志 def __init__(self): super().__init__(agent, USER_TASKS) @@ -180,17 +209,33 @@ async def send_text(self, chat_id, content, *, msg_id=None, is_group=False): async def send_done(self, chat_id, raw_text, *, msg_id=None, is_group=False): # 先发清理后的文本(去掉 [FILE:] 标记),再把文件作为富媒体发出 - files = [p for p in extract_files(raw_text) if os.path.exists(p)] + files = [p for p in extract_files(raw_text) + if os.path.exists(p) and not _is_inbound(p)] body = strip_files(clean_reply(raw_text)) if body and body != "...": await self.send_text(chat_id, body, msg_id=msg_id, is_group=is_group) for path in files: await self._send_file(chat_id, path, msg_id=msg_id, is_group=is_group) + async def send_done_files(self, chat_id, raw_text, *, msg_id=None, is_group=False): + # 流式逐 turn 模式收尾:turn 日志已实时推送完毕,这里只补发生成的文件, + # 不再重复汇总全部 turn 文本。 + files = [p for p in extract_files(raw_text) + if os.path.exists(p) and not _is_inbound(p)] + for path in files: + await self._send_file(chat_id, path, msg_id=msg_id, is_group=is_group) + async def _send_file(self, chat_id, path, *, msg_id=None, is_group=False): - """QQ 富媒体出站:本地文件 -> 公网URL -> 腾讯反向拉取。失败则降级为文本提示。""" + """QQ 富媒体出站:本地文件 -> 公网URL -> 腾讯反向拉取。 + + 腾讯反向拉取对文件体积有上限(实测 1MB 文档可发,2.9MB pdf / 11.5MB docx + 会被拒,报 "download file error")。原生富媒体发送失败时,降级为把公网下载 + 链接作为文本发出——该链接经 cloudflared 隧道直连本地文件服务,不受腾讯体积 + 限制,用户可在 TTL(默认 1 小时)内手动下载。""" name = os.path.basename(path) file_type = _qq_file_type(path) + size = os.path.getsize(path) if os.path.exists(path) else 0 + url = None try: url = await asyncio.to_thread(puburl.publish, path) if not url: @@ -201,10 +246,18 @@ async def _send_file(self, chat_id, path, *, msg_id=None, is_group=False): media = await upload(**{key: chat_id, "file_type": file_type, "url": url}) await send(**{key: chat_id, "msg_type": 7, "media": media, "msg_id": msg_id, "msg_seq": _next_msg_seq()}) - print(f"[QQ] send_file ok ({name}, type={file_type}) via {url}") + print(f"[QQ] send_file ok ({name}, type={file_type}, {size}B) via {url}") except Exception as e: - print(f"[QQ] send_file failed ({name}): {e}") - await self.send_text(chat_id, f"⚠️ 文件「{name}」发送失败:{e}", msg_id=msg_id, is_group=is_group) + print(f"[QQ] send_file failed ({name}, {size}B): {e}") + if url: + # 降级:原生富媒体被腾讯拒收(多见于大文件),改发公网下载链接 + mb = size / 1024 / 1024 + tip = (f"📎 文件「{name}」({mb:.1f}MB)超出 QQ 直传体积限制," + f"已转为下载链接(1 小时内有效):\n{url}") + await self.send_text(chat_id, tip, msg_id=msg_id, is_group=is_group) + print(f"[QQ] send_file degraded to link ({name}) via {url}") + else: + await self.send_text(chat_id, f"⚠️ 文件「{name}」发送失败:{e}", msg_id=msg_id, is_group=is_group) async def on_message(self, data, is_group=False): try: From a19763c13a58305a5f18eebaf9276537fbe3de77 Mon Sep 17 00:00:00 2001 From: liangzexin96 Date: Fri, 5 Jun 2026 19:19:05 +0800 Subject: [PATCH 4/4] =?UTF-8?q?feat(qq):=20=E9=99=84=E4=BB=B6=E7=BC=93?= =?UTF-8?q?=E5=86=B2=E4=B8=8E=20/clearfiles,=20=E4=BF=AE=E5=A4=8D=E8=BF=9E?= =?UTF-8?q?=E5=8F=91=E8=AE=A1=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PENDING 缓冲附件, 待用户文字指令一并触发模型 - 任务运行中收到附件给出缓存回执, 防止重复发送 - /clearfiles 撤销已缓存附件(命令分发前拦截) - 并发分支改用缓冲累计数替代单条 len(attachments), 修复连发文件始终显示"已收到1个"的问题 --- frontends/qqapp.py | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/frontends/qqapp.py b/frontends/qqapp.py index 3cffd9bb0..f5a03eb03 100644 --- a/frontends/qqapp.py +++ b/frontends/qqapp.py @@ -22,6 +22,9 @@ APP_SECRET = str(mykeys.get("qq_app_secret", "") or "").strip() ALLOWED = {str(x).strip() for x in mykeys.get("qq_allowed_users", []) if str(x).strip()} PROCESSED_IDS, USER_TASKS = deque(maxlen=1000), {} +# 缓冲区:用户发来的附件先暂存,等用户发文字指令再合并触发模型 +# {chat_id: [(kind_label, 'temp/qq_inbox/xxx'), ...]} +PENDING = {} SEQ_LOCK, MSG_SEQ = threading.Lock(), 1 @@ -277,8 +280,43 @@ async def on_message(self, data, is_group=False): return print(f"[QQ] message from {user_id} ({'group' if is_group else 'c2c'}): {content!r} +{len(attachments)} attach") if content.startswith("/"): + if content.strip().lower() == "/clearfiles": + pend = PENDING.pop(chat_id, []) + if pend: + kinds = "、".join(sorted({k for k, _ in pend})) + tip = f"🗑️ 已撤销缓存的 {len(pend)} 个附件({kinds})。" + else: + tip = "📭 当前没有缓存的附件。" + return await self.send_text(chat_id, tip, msg_id=msg_id, is_group=is_group) return await self.handle_command(chat_id, content, msg_id=msg_id, is_group=is_group) - prompt = _build_prompt(content, attachments) + # 1) 先把本条消息的附件存入缓冲(不立即触发模型) + if attachments: + PENDING.setdefault(chat_id, []).extend(attachments) + # 2) 并发保护:该会话已有任务在跑,拒绝新指令,提示可 /stop 中断 + if chat_id in USER_TASKS: + if attachments: + pend = PENDING.get(chat_id, []) + kinds = "、".join(sorted({k for k, _ in pend})) + tip = (f"📥 已收到本条 {len(attachments)} 个附件并缓存,当前共缓存 {len(pend)} 个({kinds}),无需重发。" + f"\n⏳ 但当前正在处理上一条指令,附件会在你下达新指令时一并带上。" + f"发送 /stop 可中断当前任务后立即下达,发送 /clearfiles 可撤销已缓存的附件。") + else: + tip = "⏳ 正在处理上一条指令。发送 /stop 可中断后再下达新指令。" + return await self.send_text(chat_id, tip, msg_id=msg_id, is_group=is_group) + # 3) 只有附件、没有文字指令 → 回执并等待文字 + if not content: + pend = PENDING.get(chat_id, []) + if pend: + kinds = "、".join(sorted({k for k, _ in pend})) + return await self.send_text( + chat_id, + f"📥 已收到 {len(pend)} 个附件({kinds}),已缓存。请发送文字说明要做什么,我再开始处理。" + f"\n发送 /clearfiles 可撤销已缓存的附件。", + msg_id=msg_id, is_group=is_group) + return + # 4) 有文字指令 → 合并缓冲的附件一起触发,清空缓冲 + buffered = PENDING.pop(chat_id, []) + prompt = _build_prompt(content, buffered) asyncio.create_task(self.run_agent(chat_id, prompt, msg_id=msg_id, is_group=is_group)) except Exception: import traceback