fix(sdk): support async generator functions in control() decorator#116
fix(sdk): support async generator functions in control() decorator#116KazChe wants to merge 1 commit intoagentcontrol:mainfrom
Conversation
The control() decorator silently broke on streaming (async generator) functions — the standard pattern for LLM response streaming.This adds an async_gen_wrapper path that runs pre-check before the first chunk, yields chunks in real-time while accumulating output, and runs post-check on the full accumulated output after the stream completes. fixes agentcontrol#113
|
Hihi, sdk-ts-ci check failin due to a missing SPEAKEASY_API_KEY secret that'snot available to fork PRs and unrelated to the changes in this PR (python sdk only) |
| # Yield chunks while accumulating full output for post-check | ||
| accumulated: list[str] = [] | ||
| async for chunk in func(*args, **kwargs): | ||
| accumulated.append(str(chunk)) |
There was a problem hiding this comment.
str(chunk) is lossy - if chunks are dicts, dataclasses, or framework message objects, the original data can't be recovered for the post-check. Can we handle non-string chunks here?
thanks for pointing this out, we will work on fixing this. |
|
Please also add tests for: |
| # PRE-EXECUTION: Check controls with check_stage="pre" | ||
| await _run_control_check(ctx, "pre", ctx.pre_payload(), controls) | ||
|
|
||
| # Yield chunks while accumulating full output for post-check |
There was a problem hiding this comment.
This path only runs the post-stage check after the async for finishes. If the caller breaks early, cancels the task, or the client disconnects mid-stream, Python closes the wrapper and runs finally, but never reaches the post-check. That leaves a real bypass for streaming consumers, which is exactly where partial reads are common. This path needs explicit handling/documentation and tests for break, cancellation, and aclose().
|
|
||
| # Yield chunks while accumulating full output for post-check | ||
| accumulated: list[str] = [] | ||
| async for chunk in func(*args, **kwargs): |
There was a problem hiding this comment.
Unlike the existing non-streaming wrapper, this implementation yields every chunk to the caller before enforcing the post-stage result. A post-stage deny or steer therefore raises only after the full response has already been delivered, so it no longer provides the same fail-closed behavior as control() on normal async functions. If that tradeoff is intentional, it needs to be called out very clearly; otherwise the stream must be buffered or evaluated chunk-by-chunk.
Summary
control()decorator, fixing crashes and silent bypasses when applied toasync deffunctions withyield— the standard pattern for LLM response streaming.Scope
@control()now correctly wraps async generator functions, preservinginspect.isasyncgenfunction()identityasync_gen_wrapperpath incontrol()that runs pre-check before first chunk, yields chunks in real-time while accumulating, and runs post-check on full accumulated output after stream completesRisk and Rollout
Testing
make check(lint + typecheck pass; server tests have pre-existing postgres failures unrelated to this change)Checklist