Skip to content

refactor: responses and messages genai streams integration#92

Open
eternalcuriouslearner wants to merge 6 commits into
open-telemetry:mainfrom
eternalcuriouslearner:responses-and-messages-streams-integration
Open

refactor: responses and messages genai streams integration#92
eternalcuriouslearner wants to merge 6 commits into
open-telemetry:mainfrom
eternalcuriouslearner:responses-and-messages-streams-integration

Conversation

@eternalcuriouslearner
Copy link
Copy Markdown
Contributor

Description

Migrate Responses and Messages streams to common gen-ai streams

Fixes # (issue)

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How has this been tested?

Please describe the tests that you ran to verify your changes. Provide
instructions so we can reproduce. List any relevant details for your test
configuration.

  • existing tests are running successfully.

Checklist

See CONTRIBUTING.md
for the style guide, changelog guidance, and more.

  • Followed the style guidelines of this project
  • Changelog updated if the change requires an entry
  • Unit tests added
  • Documentation updated

@eternalcuriouslearner eternalcuriouslearner requested a review from a team as a code owner May 26, 2026 15:59
Copilot AI review requested due to automatic review settings May 26, 2026 15:59
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Refactors OpenAI and Anthropic streaming wrappers to use shared opentelemetry.util.genai.stream sync/async wrapper base classes, and updates tests to align with the new stream wrapper lifecycle/iteration behavior.

Changes:

  • Introduce _ResponseStreamMixin / _MessagesStreamMixin and rebase wrappers on SyncStreamWrapper / AsyncStreamWrapper.
  • Update async fake streams in tests to implement __aiter__ and adjust assertions around when stop() is invoked.
  • Add dedicated fake sync stream in OpenAI tests to exercise response attribute fallback behavior.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
instrumentation/opentelemetry-instrumentation-genai-openai/tests/test_response_wrappers.py Updates fakes and expectations to match new shared stream wrapper behavior.
instrumentation/opentelemetry-instrumentation-genai-openai/src/opentelemetry/instrumentation/genai/openai/response_wrappers.py Refactors stream wrappers onto shared util wrappers; introduces a mixin for telemetry logic.
instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/test_async_wrappers.py Updates async fake stream + expectations for wrapper finalization behavior.
instrumentation/opentelemetry-instrumentation-genai-anthropic/src/opentelemetry/instrumentation/genai/anthropic/wrappers.py Refactors sync/async message stream wrappers onto shared util wrappers with a telemetry mixin.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.

self._finalized = True
self._self_invocation.fail(exc)
self._self_message_finalized = True
self._self_finalized = True
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of directly modifying the base class attribute, it be useful to have a _set_finalized api in the base class in the future. Having instrumentations have knowledge of the base class attributes is awkward. Probably for a different pr.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lzchen I updated the provider-specific flag names to make the distinction clearer:

  • _self_response_finalized -> _self_response_telemetry_finalized
  • _self_message_finalized -> _self_message_telemetry_finalized

I also removed the direct writes to the shared base _self_finalized flag from the provider wrappers, so Responses/Messages no longer modify the base class lifecycle attribute directly. The shared SyncStreamWrapper / AsyncStreamWrapper remain the only owners of _self_finalized.

Do you still think we should add a _set_finalized() in the shared API?

self._fail(str(error), type(error))
raise
with self._safe_instrumentation("event processing"):
self.process_event(event)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably produce a more generic error message instead of the one generated by self._safe_instrumentation now right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @lzchen self._safe_instrumentation is probably an overkill so I removed it. I discussed it with @lmolkova on the anthropic pr and removed it from messages instrumentation. So I am removing it from here.

def _current_message_snapshot(
self,
) -> ParsedMessage[ResponseFormatT] | None:
return None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method necessary? Is it correct to always return None here?

also it seems sync tests don't actually validate proper accumulation https://github.com/eternalcuriouslearner/opentelemetry-python-genai/blob/262790f81db55a6a660b8414ddc5d9ae1cc85e5a/instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/test_sync_messages.py#L469

I don't see any async streaming tests - how do we know it works ? 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

4 participants