diff --git a/src/engine/ov_genai/llm.py b/src/engine/ov_genai/llm.py index 12bcdf8..0b3fe42 100755 --- a/src/engine/ov_genai/llm.py +++ b/src/engine/ov_genai/llm.py @@ -141,8 +141,15 @@ async def _run_generation(): generation_kwargs, streamer ) + def _generation_exception_handler(task: asyncio.Task): + exc = task.exception() + if exc and not task.cancelled(): + # Force break the below loop + streamer.text_queue.put_nowait(None) + raise exc gen_task = asyncio.create_task(_run_generation()) + gen_task.add_done_callback(_generation_exception_handler) try: while True: diff --git a/src/engine/ov_genai/streamers.py b/src/engine/ov_genai/streamers.py index 839f87c..d7844bf 100644 --- a/src/engine/ov_genai/streamers.py +++ b/src/engine/ov_genai/streamers.py @@ -53,6 +53,9 @@ def write(self, token: Union[int, List[int]]) -> openvino_genai.StreamingStatus: def cancel(self) -> None: """Signal cancellation of the streaming generation.""" + # Signal completion to unblock any waiting consumer + # Write performs the same task. However, if there is nothing to be written yet, it may not be received by the consumer + self.text_queue.put_nowait(None) self._cancelled.set() def is_cancelled(self) -> bool: diff --git a/src/engine/ov_genai/vlm.py b/src/engine/ov_genai/vlm.py index 8d10f77..2e40a95 100644 --- a/src/engine/ov_genai/vlm.py +++ b/src/engine/ov_genai/vlm.py @@ -202,8 +202,15 @@ async def _run_generation(): generation_config=generation_kwargs, streamer=streamer, ) + def _generation_exception_handler(task: asyncio.Task): + exc = task.exception() + if exc and not task.cancelled(): + # Force break the below loop + streamer.text_queue.put_nowait(None) + raise exc gen_task = asyncio.create_task(_run_generation()) + gen_task.add_done_callback(_generation_exception_handler) try: while True: diff --git a/src/server/worker_registry.py b/src/server/worker_registry.py index 2e54bd5..63cc841 100644 --- a/src/server/worker_registry.py +++ b/src/server/worker_registry.py @@ -837,6 +837,16 @@ async def stream_generate(self, model_name: str, gen_config: OVGenAI_GenConfig) stream_queue=stream_queue, result_future=result_future, ) + async def _generation_exception_handler(future: asyncio.Future): + logger.error("Got stream cancel.") + exc = future.exception() + if exc and not future.cancelled(): + # Force break the below loop + logger.error("Canceling.") + await self.infer_cancel(request_id) + stream_queue.put_nowait(None) + raise exc + result_future.add_done_callback(_generation_exception_handler) # Register active request async with self._lock: