From 0b98adae9c771e2689a79ac624b0e404f00bba9a Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Fri, 10 Apr 2026 19:44:57 -0400 Subject: [PATCH 1/3] Add generation exception handler to LLM, VLM engines --- src/engine/ov_genai/llm.py | 7 +++++++ src/engine/ov_genai/vlm.py | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/src/engine/ov_genai/llm.py b/src/engine/ov_genai/llm.py index 12bcdf8..0b3fe42 100755 --- a/src/engine/ov_genai/llm.py +++ b/src/engine/ov_genai/llm.py @@ -141,8 +141,15 @@ async def _run_generation(): generation_kwargs, streamer ) + def _generation_exception_handler(task: asyncio.Task): + exc = task.exception() + if exc and not task.cancelled(): + # Force break the below loop + streamer.text_queue.put_nowait(None) + raise exc gen_task = asyncio.create_task(_run_generation()) + gen_task.add_done_callback(_generation_exception_handler) try: while True: diff --git a/src/engine/ov_genai/vlm.py b/src/engine/ov_genai/vlm.py index 054bb54..6015ef7 100644 --- a/src/engine/ov_genai/vlm.py +++ b/src/engine/ov_genai/vlm.py @@ -189,8 +189,15 @@ async def _run_generation(): generation_config=generation_kwargs, streamer=streamer, ) + def _generation_exception_handler(task: asyncio.Task): + exc = task.exception() + if exc and not task.cancelled(): + # Force break the below loop + streamer.text_queue.put_nowait(None) + raise exc gen_task = asyncio.create_task(_run_generation()) + gen_task.add_done_callback(_generation_exception_handler) try: while True: From 7b5ee631cf6d82971822b55dbbd699cd45693771 Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Fri, 29 May 2026 09:28:19 -0400 Subject: [PATCH 2/3] Apply exception handler to WorkerRegistry --- src/server/worker_registry.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/server/worker_registry.py b/src/server/worker_registry.py index 2e54bd5..63cc841 100644 --- a/src/server/worker_registry.py +++ b/src/server/worker_registry.py @@ -837,6 +837,16 @@ async def stream_generate(self, model_name: str, gen_config: OVGenAI_GenConfig) stream_queue=stream_queue, result_future=result_future, ) + async def _generation_exception_handler(future: asyncio.Future): + logger.error("Got stream cancel.") + exc = future.exception() + if exc and not future.cancelled(): + # Force break the below loop + logger.error("Canceling.") + await self.infer_cancel(request_id) + stream_queue.put_nowait(None) + raise exc + result_future.add_done_callback(_generation_exception_handler) # Register active request async with self._lock: From 2cb3794c7f218ec7974e0f9247e1053d38da4c12 Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Fri, 29 May 2026 09:32:08 -0400 Subject: [PATCH 3/3] Ensure ChunkStreamer cancel is passed --- src/engine/ov_genai/streamers.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/engine/ov_genai/streamers.py b/src/engine/ov_genai/streamers.py index 839f87c..d7844bf 100644 --- a/src/engine/ov_genai/streamers.py +++ b/src/engine/ov_genai/streamers.py @@ -53,6 +53,9 @@ def write(self, token: Union[int, List[int]]) -> openvino_genai.StreamingStatus: def cancel(self) -> None: """Signal cancellation of the streaming generation.""" + # Signal completion to unblock any waiting consumer + # Write performs the same task. However, if there is nothing to be written yet, it may not be received by the consumer + self.text_queue.put_nowait(None) self._cancelled.set() def is_cancelled(self) -> bool: