fix: stop webchat stream from crashing on queue polling errors#6123
fix: stop webchat stream from crashing on queue polling errors#6123stablegenius49 wants to merge 1 commit intoAstrBotDevs:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the robustness of the webchat SSE stream by refactoring its error handling mechanism. It introduces a dedicated helper function to gracefully manage various queue polling exceptions, preventing stream crashes and ensuring a more stable user experience, particularly during client disconnections or transient errors. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Hey - 我发现了 1 个问题,并给出了一些整体性的反馈:
- 建议为
_poll_webchat_stream_result添加简短的文档字符串(docstring)和类型注解,把(result, should_break)的返回约定,以及预期的队列接口为后续维护者明确说明。 - 由于
_poll_webchat_stream_result现在是流式循环行为中的关键部分,你可能还想把if not result: continue这段逻辑也集中到这个 helper 里,这样调用方只需要根据should_break做分支,而由 helper 完整负责轮询/继续的语义。
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider adding a short docstring and type hints to `_poll_webchat_stream_result` to make the `(result, should_break)` return contract and expected queue interface explicit for future maintainers.
- Since `_poll_webchat_stream_result` is now a key part of the streaming loop behavior, you might want to centralize the `if not result: continue` logic into the helper as well, so the caller only branches on `should_break` and the helper fully owns the polling/continue semantics.
## Individual Comments
### Comment 1
<location path="tests/test_chat_route.py" line_range="24-25" />
<code_context>
+ return self._result
+
+
+@pytest.mark.asyncio
+async def test_poll_webchat_stream_result_breaks_on_cancelled_error():
+ result, should_break = await _poll_webchat_stream_result(
+ _QueueThatRaises(asyncio.CancelledError()),
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test case for the timeout (`asyncio.TimeoutError`) branch of `_poll_webchat_stream_result`.
This path has distinct behavior for `asyncio.TimeoutError` (returning `(None, False)` so the caller continues looping), but we currently only test success, generic exceptions, and `CancelledError`. Please add a test that simulates a timeout and asserts `result is None` and `should_break is False`. You can keep it fast by using a queue stub whose `get()` raises `asyncio.TimeoutError` directly, similar to `_QueueThatRaises`.
</issue_to_address>帮我变得更有用!请对每条评论点 👍 或 👎,我会根据你的反馈来改进评审质量。
Original comment in English
Hey - I've found 1 issue, and left some high level feedback:
- Consider adding a short docstring and type hints to
_poll_webchat_stream_resultto make the(result, should_break)return contract and expected queue interface explicit for future maintainers. - Since
_poll_webchat_stream_resultis now a key part of the streaming loop behavior, you might want to centralize theif not result: continuelogic into the helper as well, so the caller only branches onshould_breakand the helper fully owns the polling/continue semantics.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider adding a short docstring and type hints to `_poll_webchat_stream_result` to make the `(result, should_break)` return contract and expected queue interface explicit for future maintainers.
- Since `_poll_webchat_stream_result` is now a key part of the streaming loop behavior, you might want to centralize the `if not result: continue` logic into the helper as well, so the caller only branches on `should_break` and the helper fully owns the polling/continue semantics.
## Individual Comments
### Comment 1
<location path="tests/test_chat_route.py" line_range="24-25" />
<code_context>
+ return self._result
+
+
+@pytest.mark.asyncio
+async def test_poll_webchat_stream_result_breaks_on_cancelled_error():
+ result, should_break = await _poll_webchat_stream_result(
+ _QueueThatRaises(asyncio.CancelledError()),
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test case for the timeout (`asyncio.TimeoutError`) branch of `_poll_webchat_stream_result`.
This path has distinct behavior for `asyncio.TimeoutError` (returning `(None, False)` so the caller continues looping), but we currently only test success, generic exceptions, and `CancelledError`. Please add a test that simulates a timeout and asserts `result is None` and `should_break is False`. You can keep it fast by using a queue stub whose `get()` raises `asyncio.TimeoutError` directly, similar to `_QueueThatRaises`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| @pytest.mark.asyncio | ||
| async def test_poll_webchat_stream_result_breaks_on_cancelled_error(): |
There was a problem hiding this comment.
suggestion (testing): 为 _poll_webchat_stream_result 的超时(asyncio.TimeoutError)分支添加一个测试用例。
这条路径对 asyncio.TimeoutError 有不同的行为(返回 (None, False),使调用方继续循环),但目前我们只测试了成功情况、通用异常和 CancelledError。请添加一个模拟超时的测试,并断言 result is None 且 should_break is False。你可以像 _QueueThatRaises 一样,使用一个其 get() 会直接抛出 asyncio.TimeoutError 的队列 stub,从而保持测试运行速度较快。
Original comment in English
suggestion (testing): Add a test case for the timeout (asyncio.TimeoutError) branch of _poll_webchat_stream_result.
This path has distinct behavior for asyncio.TimeoutError (returning (None, False) so the caller continues looping), but we currently only test success, generic exceptions, and CancelledError. Please add a test that simulates a timeout and asserts result is None and should_break is False. You can keep it fast by using a queue stub whose get() raises asyncio.TimeoutError directly, similar to _QueueThatRaises.
There was a problem hiding this comment.
Code Review
This pull request effectively addresses a critical issue where webchat stream crashes due to queue polling errors by refactoring the polling logic into a dedicated helper function, _poll_webchat_stream_result. This significantly improves the robustness and maintainability of the streaming mechanism. The addition of comprehensive unit tests for the new helper function is also a great step towards ensuring its reliability. The changes correctly differentiate between CancelledError (clean disconnect) and generic exceptions, preventing stream crashes and improving error handling.
| async def _poll_webchat_stream_result(back_queue, username: str): | ||
| try: | ||
| result = await asyncio.wait_for(back_queue.get(), timeout=1) | ||
| except asyncio.TimeoutError: | ||
| return None, False | ||
| except asyncio.CancelledError: | ||
| logger.debug(f"[WebChat] 用户 {username} 断开聊天长连接。") | ||
| return None, True | ||
| except Exception as e: | ||
| logger.error(f"WebChat stream error: {e}") | ||
| return None, False | ||
| return result, False |
There was a problem hiding this comment.
The new _poll_webchat_stream_result function is a great addition for encapsulating the polling logic and error handling. To further enhance maintainability and clarity, consider adding a docstring to explain its purpose, parameters, and return values. Additionally, specifying the type hint for back_queue as asyncio.Queue would improve type checking and readability.
async def _poll_webchat_stream_result(back_queue: asyncio.Queue, username: str):
"""Polls the back_queue for results, handling timeouts, cancellations, and other exceptions.
Args:
back_queue: The asyncio.Queue to poll for results.
username: The username associated with the webchat stream.
Returns:
A tuple containing the result (or None) and a boolean indicating if the stream should break.
"""
try:
result = await asyncio.wait_for(back_queue.get(), timeout=1)
except asyncio.TimeoutError:
return None, False
except asyncio.CancelledError:
logger.debug(f"[WebChat] 用户 {username} 断开聊天长连接。")
return None, True
except Exception as e:
logger.error(f"WebChat stream error: {e}")
return None, False
return result, False
Modifications / 改动点
Fix the webchat SSE loop in
astrbot/dashboard/routes/chat.pyso queue polling errors do not fall through toif not resultwith an unbound local.Treat
CancelledErroras a clean client disconnect (break) and keep generic queue polling errors non-fatal (continue).Add regression tests for the new polling helper to cover success, generic exceptions, and
CancelledError.This is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
Closes #6118.
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.Summary by Sourcery
更稳健地处理网页聊天流式队列轮询错误,防止 SSE 崩溃,并更干净地处理客户端断开连接。
Bug Fixes:
Tests:
CancelledError处理。Original summary in English
Summary by Sourcery
Handle webchat streaming queue polling errors more robustly to prevent SSE crashes and treat client disconnects cleanly.
Bug Fixes:
Tests: