Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 122 additions & 3 deletions frontends/chatapp_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ def build_help_text(commands=HELP_COMMANDS):
return "📖 命令列表:\n" + "\n".join(f"{cmd} - {desc}" for cmd, desc in commands)


def _extract_progress(text):
"""从 agent 的流式文本中提取一句可读的进度快照。
优先取最新 <summary>...</summary>,否则取最后一行有意义的文本。"""
if not text:
return ""
ms = re.findall(r"<summary>(.*?)</summary>", text, re.DOTALL)
if ms:
s = ms[-1].strip()
if s:
return s[:300]
for line in reversed(text.splitlines()):
ln = line.strip()
if ln and not ln.startswith("```") and not ln.startswith("<"):
return ln[:300]
return ""


HELP_TEXT = build_help_text()
FILE_HINT = "If you need to show files to user, use [FILE:filepath] in your response."
TAG_PATS = [r"<" + t + r">.*?</" + t + r">" for t in ("thinking", "summary", "tool_use", "file_content")]
Expand Down Expand Up @@ -259,6 +276,10 @@ class AgentChatMixin:
source = "chat"
split_limit = 1500
ping_interval = 20
# 流式逐 turn 心跳:开启后每个 turn 跑完即把该 turn 详情作为日志推送,
# 不再发"还在处理中"占位,也不在收尾时重复汇总全部 turn。默认关闭,
# 仅在能承受多条出站消息的前端(如 QQ)启用。
stream_turns = False

def __init__(self, agent, user_tasks):
self.agent, self.user_tasks = agent, user_tasks
Expand All @@ -269,6 +290,33 @@ async def send_text(self, chat_id, content, **ctx):
async def send_done(self, chat_id, raw_text, **ctx):
await self.send_text(chat_id, build_done_text(raw_text), **ctx)

@staticmethod
def format_turn_log(turn_no, text):
"""把单个 turn 的累积文本整理成可读日志。
保留 <summary>(监控用),剔除 <thinking>(冗长),并去掉重复的
"LLM Running (Turn N) ..." 头,避免与外层 Turn 标记重复。"""
body = re.sub(r"<thinking>.*?</thinking>", "", text or "", flags=re.DOTALL)
body = re.sub(r"^\s*\*{0,2}(?:LLM Running )?\(?Turn \d+\)? ?\.\.\.\*{0,2}\s*", "", body)
body = re.sub(r"\n{3,}", "\n\n", body).strip()
return f"📍 Turn {turn_no}\n{body or '(无文本输出)'}"

async def send_turn(self, chat_id, turn_no, text, **ctx):
"""逐 turn 心跳推送。容错:单条心跳发送失败不应中断整个任务。"""
try:
await self.send_text(chat_id, self.format_turn_log(turn_no, text), **ctx)
except Exception as e:
print(f"[{self.label}] send_turn {turn_no} failed: {e}")

async def send_done_files(self, chat_id, raw_text, **ctx):
"""流式模式收尾:只补发生成的文件,不再重复汇总全部 turn 文本。
基类无富媒体能力,默认空操作;有富媒体的前端(如 QQ)覆盖此方法。"""
return

@staticmethod
def format_done_message(turn_count):
"""对话完全结束后的结束语提示。"""
return f"✅ 本次任务已全部结束,共 {turn_count} 个 turn。可以下达新指令了。"

async def handle_command(self, chat_id, cmd, **ctx):
parts = (cmd or "").split()
op = (parts[0] if parts else "").lower()
Expand Down Expand Up @@ -317,23 +365,42 @@ async def handle_command(self, chat_id, cmd, **ctx):
return await self.send_text(chat_id, HELP_TEXT, **ctx)

async def run_agent(self, chat_id, text, **ctx):
if self.stream_turns:
return await self._run_agent_streaming(chat_id, text, **ctx)
return await self._run_agent_classic(chat_id, text, **ctx)

async def _run_agent_classic(self, chat_id, text, **ctx):
state = {"running": True}
self.user_tasks[chat_id] = state
try:
await self.send_text(chat_id, "思考中...", **ctx)
await self.send_text(chat_id, "🤔 思考中...", **ctx)
dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source)
last_ping = time.time()
latest = "" # 最近一次的进度原文(当前 turn 文本)
sent_progress = "" # 上次实际发出的进度快照,去重用
while state["running"]:
try:
item = await asyncio.to_thread(dq.get, True, 3)
except Q.Empty:
# 节流:每 ping_interval 发一次进度快照(沿用原有消息预算,
# 不额外增加被动回复条数,只是把"处理中"换成真实进度)
if self.agent.is_running and time.time() - last_ping > self.ping_interval:
await self.send_text(chat_id, "⏳ 还在处理中,请稍等...", **ctx)
snap = _extract_progress(latest)
if snap and snap != sent_progress:
await self.send_text(chat_id, f"⚙️ {snap}", **ctx)
sent_progress = snap
else:
await self.send_text(chat_id, "⏳ 还在处理中,请稍等...", **ctx)
last_ping = time.time()
continue
if "done" in item:
await self.send_done(chat_id, item.get("done", ""), **ctx)
# 被 /stop 打断时不再补发完整结果,交给下方"已停止"提示
if state["running"]:
await self.send_done(chat_id, item.get("done", ""), **ctx)
break
if "next" in item:
outs = item.get("outputs") or []
latest = outs[-1] if outs else (item.get("next", "") or "")
if not state["running"]:
await self.send_text(chat_id, "⏹️ 已停止", **ctx)
except Exception as e:
Expand All @@ -344,6 +411,58 @@ async def run_agent(self, chat_id, text, **ctx):
finally:
self.user_tasks.pop(chat_id, None)

async def _run_agent_streaming(self, chat_id, text, **ctx):
"""逐 turn 实时心跳模式:每个 turn 跑完即推送该 turn 详情,作为实时日志。
不发"还在处理中"占位;收尾时不再重复汇总全部 turn,只补发生成的文件。

turn 完成判定:put_task 产出的 'next' 项里 turn 号递增,意味着上一个
turn 已经结束。此时 outputs[-2] 是刚结束 turn 的完整文本(agentmain 用
turn_resps[-2:] 携带最近两个 turn)。done 项携带全部 turn 文本,用于补发
尚未推送的尾部 turn。"""
state = {"running": True}
self.user_tasks[chat_id] = state
sent_turn = 0 # 已推送的最大 turn 号
try:
await self.send_text(chat_id, "🤔 开始处理,将逐 turn 推送日志...", **ctx)
dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source)
while state["running"]:
try:
item = await asyncio.to_thread(dq.get, True, 3)
except Q.Empty:
continue
if "next" in item:
cur = item.get("turn", 0)
outs = item.get("outputs") or []
# turn 号已推进且恰好领先 1 → 上一个 turn (cur-1) 刚结束,
# 其完整文本就在 outs[-2](agentmain 用 turn_resps[-2:] 携带)。
# 只在严格连续(cur-1 == sent_turn+1)时推送,避免重复;
# 万一出现跳跃,留给 done 阶段统一补发,保证不漏不重。
if cur - 1 == sent_turn + 1 and len(outs) >= 2:
await self.send_turn(chat_id, cur - 1, outs[-2], **ctx)
sent_turn = cur - 1
continue
if "done" in item:
if state["running"]:
all_outs = item.get("outputs") or []
# 补发所有尚未推送的 turn(含最后一个 turn)
for t in range(sent_turn + 1, len(all_outs) + 1):
await self.send_turn(chat_id, t, all_outs[t - 1], **ctx)
sent_turn = len(all_outs)
# 只补发生成的文件,不重复汇总全部 turn 文本
await self.send_done_files(chat_id, item.get("done", ""), **ctx)
# 对话完全结束后发结束语提示
await self.send_text(chat_id, self.format_done_message(sent_turn), **ctx)
break
if not state["running"]:
await self.send_text(chat_id, "⏹️ 已停止", **ctx)
except Exception as e:
import traceback
print(f"[{self.label}] run_agent(stream) error: {e}")
traceback.print_exc()
await self.send_text(chat_id, f"❌ 错误: {e}", **ctx)
finally:
self.user_tasks.pop(chat_id, None)


from agentmain import GeneraticAgent as _GA
from continue_cmd import handle_frontend_command as _handle_continue_frontend, install as _install_continue, reset_conversation as _reset_conversation
Expand Down
Loading