refactor: responses and messages genai streams integration#92
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Refactors OpenAI and Anthropic streaming wrappers to use shared opentelemetry.util.genai.stream sync/async wrapper base classes, and updates tests to align with the new stream wrapper lifecycle/iteration behavior.
Changes:
- Introduce
_ResponseStreamMixin/_MessagesStreamMixinand rebase wrappers onSyncStreamWrapper/AsyncStreamWrapper. - Update async fake streams in tests to implement
__aiter__and adjust assertions around whenstop()is invoked. - Add dedicated fake sync stream in OpenAI tests to exercise response attribute fallback behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| instrumentation/opentelemetry-instrumentation-genai-openai/tests/test_response_wrappers.py | Updates fakes and expectations to match new shared stream wrapper behavior. |
| instrumentation/opentelemetry-instrumentation-genai-openai/src/opentelemetry/instrumentation/genai/openai/response_wrappers.py | Refactors stream wrappers onto shared util wrappers; introduces a mixin for telemetry logic. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/test_async_wrappers.py | Updates async fake stream + expectations for wrapper finalization behavior. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/src/opentelemetry/instrumentation/genai/anthropic/wrappers.py | Refactors sync/async message stream wrappers onto shared util wrappers with a telemetry mixin. |
| self._finalized = True | ||
| self._self_invocation.fail(exc) | ||
| self._self_message_finalized = True | ||
| self._self_finalized = True |
There was a problem hiding this comment.
Instead of directly modifying the base class attribute, it be useful to have a _set_finalized api in the base class in the future. Having instrumentations have knowledge of the base class attributes is awkward. Probably for a different pr.
There was a problem hiding this comment.
@lzchen I updated the provider-specific flag names to make the distinction clearer:
_self_response_finalized->_self_response_telemetry_finalized_self_message_finalized->_self_message_telemetry_finalized
I also removed the direct writes to the shared base _self_finalized flag from the provider wrappers, so Responses/Messages no longer modify the base class lifecycle attribute directly. The shared SyncStreamWrapper / AsyncStreamWrapper remain the only owners of _self_finalized.
Do you still think we should add a _set_finalized() in the shared API?
| self._fail(str(error), type(error)) | ||
| raise | ||
| with self._safe_instrumentation("event processing"): | ||
| self.process_event(event) |
There was a problem hiding this comment.
This will probably produce a more generic error message instead of the one generated by self._safe_instrumentation now right?
| def _current_message_snapshot( | ||
| self, | ||
| ) -> ParsedMessage[ResponseFormatT] | None: | ||
| return None |
There was a problem hiding this comment.
is this method necessary? Is it correct to always return None here?
also it seems sync tests don't actually validate proper accumulation https://github.com/eternalcuriouslearner/opentelemetry-python-genai/blob/262790f81db55a6a660b8414ddc5d9ae1cc85e5a/instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/test_sync_messages.py#L469
I don't see any async streaming tests - how do we know it works ? 🤔
Description
Migrate Responses and Messages streams to common gen-ai streams
Fixes # (issue)
Type of change
Please delete options that are not relevant.
How has this been tested?
Please describe the tests that you ran to verify your changes. Provide
instructions so we can reproduce. List any relevant details for your test
configuration.
Checklist
See CONTRIBUTING.md
for the style guide, changelog guidance, and more.