Skip to content

Implement LLM streaming across adapter → client → strategy → executor pipeline #3

@rogue-socket

Description

@rogue-socket

Summary

LLM streaming requires changes across 5 layers. The type system and SSE parser are built and well-tested (72 tests), and adapter stream() methods exist for all 3 providers — but they are broken at runtime and nothing above the adapter layer has streaming support.

What Exists (Well-Built)

Streaming Types (llm/streaming.py, 213 lines)

  • StreamChunkType enum: START, CONTENT, TOOL_CALL_START, TOOL_CALL_CHUNK, TOOL_CALL_END, STOP, ERROR
  • StreamChunk frozen dataclass with __post_init__() validation
  • StreamingLLMResponse accumulator with from_chunks() classmethod
  • 28 tests in test_streaming_types.py

SSE Parser (llm/sse.py, 154 lines)

  • parse_sse_line() / parse_sse_event() — stateless parsers
  • SSEStreamParser — stateful line-buffered parser with feed() generator
  • Handles OpenAI (data-only), Anthropic (event+data), Gemini (JSON-line) formats
  • 31 tests in test_sse_parser.py

Adapter stream() Methods (in adapters.py)

  • OpenAI: lines 418-564 with _openai_parse_event()
  • Anthropic: lines 671-842 with _anthropic_parse_event()
  • Gemini: lines 1019-1165 with _gemini_parse_chunks()
  • 13 tests in test_adapter_streaming.py

What's Broken

Critical: _urlopen_with_retry() incompatibility

_urlopen_with_retry() (lines 100-121) does:

with urllib.request.urlopen(req, timeout=timeout) as resp:
    return json.loads(resp.read().decode("utf-8"))  # BLOCKS, returns dict

The stream() methods call it expecting a context manager:

with _urlopen_with_retry(req, timeout=timeout) as response:  # WILL FAIL
    for line in response:
        ...

This raises AttributeError at runtime — dict has no __enter__.

Fix needed: A separate _urlopen_streaming() that returns the raw response object for line-by-line iteration. urllib supports for line in resp: natively.

What's Missing (Layer by Layer)

Layer File Status Work Needed
1. HTTP transport adapters.py:100-121 Broken Create _urlopen_streaming() returning raw response
2. Adapter stream() adapters.py Broken (calls wrong fn) Wire to _urlopen_streaming()
3. LLMClient llm/client.py:65-176 Missing Add stream() method, route to adapter.stream()
4. Strategy agent/strategies.py:489 Missing Consume streaming iterator, emit chunk events
5. Executor core.py:449-504 Missing Propagate streaming via EventCallback
6. MockAdapter adapters.py:25-96 Missing No stream() method for testing

Integration Point in Strategies

# Current (strategies.py:489):
response = llm_client.call(...)  # Blocks until full response

# Needed:
if should_stream:
    async for chunk in llm_client.stream(...):
        _emit_agent_event(context, "AGENT_STREAM_CHUNK", {"chunk": chunk})
    response = StreamingLLMResponse.from_chunks(chunks)
else:
    response = llm_client.call(...)

EventCallback System (Ready for Streaming)

  • EventCallback = Callable[[str, Dict[str, Any]], None]core.py:54
  • _emit() method at core.py:359-373 — fire-and-forget with exception swallowing
  • Could emit AGENT_STREAM_CHUNK events with StreamChunk data
  • Builder API: RuntimeBuilder.with_on_event() at builder.py:252

Risk Areas

  1. urllib blocking I/O vs async: for line in resp: is blocking; needs asyncio.to_thread() or similar
  2. Retry + streaming: Partial response + error = data loss? Can't retry mid-stream
  3. Tool call reassembly: JSON chunks may split at arbitrary boundaries
  4. Timeout handling: timeout param applies to connection, not per-chunk reads
  5. Memory: Long streams accumulate chunks — need backpressure or limits

TODO Markers

  • adapters.py:324TODO(roadmap): Support streaming (stream=True) for token-level feedback.
  • adapters.py:577TODO(roadmap): Add streaming support for token-level feedback.
  • adapters.py:862TODO(roadmap): Support streaming for token-level feedback.

Priority

P1 — High impact, high effort. ~15-20 hours across 5 layers. The type system foundation is solid; the main work is HTTP transport fix + wiring through client/strategy/executor.


🤖 Generated with Claude Code

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions