From 46e58a7015f4754b736d640bcb34a2b000352365 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=81=E5=B1=BF?= Date: Tue, 2 Jun 2026 18:16:37 +0800 Subject: [PATCH 1/2] Support OpenAI Responses API instrumentation --- .../CHANGELOG.md | 4 + .../README.rst | 3 +- .../instrumentation/openai_v2/__init__.py | 53 ++ .../instrumentation/openai_v2/patch.py | 124 ++++ .../instrumentation/openai_v2/utils.py | 345 ++++++++++ .../tests/test_responses.py | 632 ++++++++++++++++++ 6 files changed, 1160 insertions(+), 1 deletion(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_responses.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md index 9132163d9..e7c5e038a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add latest-experimental semantic convention support for OpenAI Responses API + `responses.create`, including sync, async, streaming, token usage, message + content, response status, reasoning metadata, and tool definitions. + ([#209](https://github.com/alibaba/loongsuite-python-agent/issues/209)) - Fix `StreamWrapper` missing `.headers` and other attributes when using `with_raw_response` streaming ([#4113](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/4113)) - Add opt-in support for latest experimental semantic conventions (v1.37.0). Set diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/README.rst b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/README.rst index 8cf82f426..a429ac210 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/README.rst +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/README.rst @@ -57,6 +57,8 @@ Instrumenting all clients ************************* When using the instrumentor, all clients will automatically trace OpenAI operations including chat completions and embeddings. +With `latest experimental features <#enabling-the-latest-experimental-features>`_ enabled, it also traces sync and async +Responses API calls made with ``responses.create``, including ``stream=True`` calls. You can also optionally capture prompts and completions as log events. Make sure to configure OpenTelemetry tracing, logging, and events to capture all telemetry emitted by the instrumentation. @@ -130,4 +132,3 @@ References * `OpenTelemetry OpenAI Instrumentation `_ * `OpenTelemetry Project `_ * `OpenTelemetry Python Examples `_ - diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py index e95908375..e363571a9 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py @@ -40,6 +40,8 @@ --- """ +import logging +from importlib import import_module from typing import Collection from wrapt import wrap_function_wrapper @@ -66,11 +68,43 @@ async_chat_completions_create_v_new, async_chat_completions_create_v_old, async_embeddings_create, + async_responses_create_v_new, chat_completions_create_v_new, chat_completions_create_v_old, embeddings_create, + responses_create_v_new, ) +_logger = logging.getLogger(__name__) + + +def _wrap_function_wrapper_if_available(module, name, wrapper): + try: + imported_module = import_module(module) + target = imported_module + for part in name.split(".")[:-1]: + target = getattr(target, part) + getattr(target, name.split(".")[-1]) + except (AttributeError, ModuleNotFoundError) as exc: + _logger.debug( + "Skipping optional OpenAI wrapper %s.%s: %s", + module, + name, + exc, + ) + return + wrap_function_wrapper(module=module, name=name, wrapper=wrapper) + + +def _unwrap_if_available(module_name, class_name, method_name): + try: + module = import_module(module_name) + except ModuleNotFoundError: + return + cls = getattr(module, class_name, None) + if cls is not None and hasattr(cls, method_name): + unwrap(cls, method_name) + class OpenAIInstrumentor(BaseInstrumentor): def __init__(self): @@ -159,6 +193,19 @@ def _instrument(self, **kwargs): ), ) + if latest_experimental_enabled: + _wrap_function_wrapper_if_available( + module="openai.resources.responses", + name="Responses.create", + wrapper=responses_create_v_new(handler, content_mode), + ) + + _wrap_function_wrapper_if_available( + module="openai.resources.responses", + name="AsyncResponses.create", + wrapper=async_responses_create_v_new(handler, content_mode), + ) + def _uninstrument(self, **kwargs): import openai # pylint: disable=import-outside-toplevel # noqa: PLC0415 @@ -166,3 +213,9 @@ def _uninstrument(self, **kwargs): unwrap(openai.resources.chat.completions.AsyncCompletions, "create") unwrap(openai.resources.embeddings.Embeddings, "create") unwrap(openai.resources.embeddings.AsyncEmbeddings, "create") + _unwrap_if_available( + "openai.resources.responses", "Responses", "create" + ) + _unwrap_if_available( + "openai.resources.responses", "AsyncResponses", "create" + ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index a37b62d65..623fef4ec 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=too-many-lines import json from timeit import default_timer @@ -47,10 +48,12 @@ _prepare_output_messages, choice_to_event, create_chat_invocation, + create_response_invocation, get_llm_request_attributes, handle_span_exception, is_streaming, message_to_event, + set_response_invocation_properties, set_span_attribute, ) @@ -265,6 +268,88 @@ async def traced_method(wrapped, instance, args, kwargs): return traced_method +def responses_create_v_new( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + def traced_method(wrapped, instance, args, kwargs): + response_invocation = handler.start_llm( + create_response_invocation( + kwargs, instance, capture_content=capture_content + ) + ) + try: + result = wrapped(*args, **kwargs) + if hasattr(result, "parse"): + parsed_result = result.parse() + else: + parsed_result = result + if is_streaming(kwargs): + return ResponsesStreamWrapper( + parsed_result, + handler, + response_invocation, + capture_content, + ) + + set_response_invocation_properties( + response_invocation, parsed_result, capture_content + ) + handler.stop_llm(response_invocation) + return result + except Exception as error: + handler.fail_llm( + response_invocation, + Error(type=type(error), message=str(error)), + ) + raise + + return traced_method + + +def async_responses_create_v_new( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + async def traced_method(wrapped, instance, args, kwargs): + response_invocation = handler.start_llm( + create_response_invocation( + kwargs, instance, capture_content=capture_content + ) + ) + try: + result = await wrapped(*args, **kwargs) + if hasattr(result, "parse"): + parsed_result = result.parse() + else: + parsed_result = result + if is_streaming(kwargs): + return ResponsesStreamWrapper( + parsed_result, + handler, + response_invocation, + capture_content, + ) + + set_response_invocation_properties( + response_invocation, parsed_result, capture_content + ) + handler.stop_llm(response_invocation) + return result + except Exception as error: + handler.fail_llm( + response_invocation, + Error(type=type(error), message=str(error)), + ) + raise + + return traced_method + + def embeddings_create( tracer: Tracer, instruments: Instruments, @@ -954,3 +1039,42 @@ def cleanup(self, error: Optional[BaseException] = None): else: self.handler.stop_llm(self.invocation) self._started = False + + +class ResponsesStreamWrapper(BaseStreamWrapper): + handler: TelemetryHandler + invocation: LLMInvocation + + def __init__( + self, + stream: Stream, + handler: TelemetryHandler, + invocation: LLMInvocation, + capture_content: bool, + ): + super().__init__(stream, capture_content=capture_content) + self.handler = handler + self.invocation = invocation + self.final_response = None + + def process_chunk(self, chunk): + response = getattr(chunk, "response", None) + if response is not None: + self.final_response = response + + def cleanup(self, error: Optional[BaseException] = None): + if not self._started: + return + + if self.final_response is not None: + set_response_invocation_properties( + self.invocation, self.final_response, self.capture_content + ) + + if error: + self.handler.fail_llm( + self.invocation, Error(type=type(error), message=str(error)) + ) + else: + self.handler.stop_llm(self.invocation) + self._started = False diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py index 7e5d2307c..d30780fd3 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py @@ -37,18 +37,35 @@ ) from opentelemetry.trace.status import Status, StatusCode from opentelemetry.util.genai.types import ( + File, + FunctionToolDefinition, + GenericToolDefinition, InputMessage, LLMInvocation, OutputMessage, Text, ToolCall, ToolCallResponse, + Uri, ) OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT = ( "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT" ) +_RESPONSE_TOOL_CALL_ITEM_TYPES = { + "code_interpreter_call", + "computer_call", + "custom_tool_call", + "file_search_call", + "function_call", + "image_generation_call", + "local_shell_call", + "mcp_call", + "tool_call", + "web_search_call", +} + def is_content_enabled() -> bool: capture_content = environ.get( @@ -401,6 +418,334 @@ def create_chat_invocation( return llm_invocation +def create_response_invocation( + kwargs, + client_instance, + capture_content: bool, +) -> LLMInvocation: + llm_invocation = LLMInvocation(request_model=kwargs.get("model", "")) + llm_invocation.provider = ( + GenAIAttributes.GenAiProviderNameValues.OPENAI.value + ) + llm_invocation.temperature = get_value(kwargs.get("temperature")) + llm_invocation.top_p = get_value(kwargs.get("top_p")) + llm_invocation.max_tokens = get_value(kwargs.get("max_output_tokens")) + + text_config = get_value(kwargs.get("text")) + output_type = _get_response_output_type(text_config) + if output_type: + llm_invocation.output_type = output_type + + address, port = get_server_address_and_port(client_instance) + if address: + llm_invocation.server_address = address + if port: + llm_invocation.server_port = port + + conversation_id = _get_response_conversation_id(kwargs.get("conversation")) + if conversation_id: + llm_invocation.conversation_id = conversation_id + + reasoning = get_value(kwargs.get("reasoning")) + attributes = {} + metric_attributes = {} + + service_tier = get_value(kwargs.get("service_tier")) + if service_tier and service_tier != "auto": + attributes[OpenAIAttributes.OPENAI_REQUEST_SERVICE_TIER] = service_tier + metric_attributes[OpenAIAttributes.OPENAI_REQUEST_SERVICE_TIER] = ( + service_tier + ) + + _set_optional_attribute( + attributes, + "gen_ai.openai.request.previous_response_id", + get_value(kwargs.get("previous_response_id")), + ) + _set_optional_attribute( + attributes, + "gen_ai.openai.request.background", + get_value(kwargs.get("background")), + ) + _set_optional_attribute( + attributes, + "gen_ai.openai.request.store", + get_value(kwargs.get("store")), + ) + _set_optional_attribute( + attributes, + "gen_ai.openai.request.parallel_tool_calls", + get_value(kwargs.get("parallel_tool_calls")), + ) + _set_optional_attribute( + attributes, + "gen_ai.openai.request.reasoning.effort", + _get_mapping_or_attr(reasoning, "effort"), + ) + _set_optional_attribute( + attributes, + "gen_ai.openai.request.reasoning.summary", + _get_mapping_or_attr(reasoning, "summary"), + ) + + if attributes: + llm_invocation.attributes = attributes + if metric_attributes: + llm_invocation.metric_attributes = metric_attributes + + if capture_content: + llm_invocation.input_messages = _prepare_response_input_messages( + kwargs.get("input") + ) + if (instructions := get_value(kwargs.get("instructions"))) is not None: + llm_invocation.system_instruction = [ + Text(content=str(instructions)) + ] + + llm_invocation.tool_definitions = _prepare_response_tool_definitions( + kwargs.get("tools") + ) + + return llm_invocation + + +def _set_optional_attribute(attributes, key, value): + if value is not None: + attributes[key] = value + + +def _get_mapping_or_attr(obj, key): + if obj is None: + return None + if isinstance(obj, Mapping): + return get_value(obj.get(key)) + return get_value(getattr(obj, key, None)) + + +def _get_response_output_type(text_config): + response_format = _get_mapping_or_attr(text_config, "format") + if response_format is None: + return None + response_format_type = _get_mapping_or_attr(response_format, "type") + if response_format_type in {"json_schema", "json_object"}: + return GenAIAttributes.GenAiOutputTypeValues.JSON.value + if response_format_type: + return response_format_type + return None + + +def _get_response_conversation_id(conversation): + if conversation is None: + return None + if isinstance(conversation, str): + return conversation + conversation_id = _get_mapping_or_attr(conversation, "id") + if conversation_id: + return conversation_id + return None + + +def _prepare_response_tool_definitions(tools) -> list: + if not value_is_set(tools): + return [] + definitions = [] + for tool in tools or []: + tool_type = get_property_value(tool, "type") or "function" + if tool_type == "function": + function = get_property_value(tool, "function") or tool + definitions.append( + FunctionToolDefinition( + name=get_property_value(function, "name") or "", + description=get_property_value(function, "description"), + parameters=get_property_value(function, "parameters"), + ) + ) + else: + definitions.append( + GenericToolDefinition(name=str(tool_type), type=str(tool_type)) + ) + return definitions + + +def _prepare_response_input_messages(input_value) -> list[InputMessage]: + if not value_is_set(input_value): + return [] + if isinstance(input_value, str): + return [InputMessage(role="user", parts=[Text(content=input_value)])] + + messages = [] + if not isinstance(input_value, Iterable) or isinstance( + input_value, (str, bytes) + ): + return messages + + for item in input_value: + item_type = get_property_value(item, "type") + if item_type and item_type not in {"message", "input_message"}: + continue + role = get_property_value(item, "role") or "user" + content = get_property_value(item, "content") + parts = _prepare_response_message_parts(content) + if parts: + messages.append(InputMessage(role=str(role), parts=parts)) + return messages + + +def _prepare_response_message_parts(content) -> list: + if content is None: + return [] + if isinstance(content, str): + return [Text(content=content)] + if not isinstance(content, Iterable) or isinstance(content, (str, bytes)): + return [Text(content=str(content))] + + parts = [] + for part in content: + part_type = get_property_value(part, "type") + text = get_property_value(part, "text") + if part_type in {"input_text", "output_text", "text"} and text: + parts.append(Text(content=str(text))) + continue + + image_url = get_property_value(part, "image_url") + file_id = get_property_value(part, "file_id") + if image_url: + parts.append( + Uri(mime_type=None, modality="image", uri=str(image_url)) + ) + elif file_id: + parts.append( + File(mime_type=None, modality="file", file_id=str(file_id)) + ) + return parts + + +def set_response_invocation_properties( + invocation: LLMInvocation, + result, + capture_content: bool, +) -> LLMInvocation: + if getattr(result, "model", None): + invocation.response_model_name = result.model + if getattr(result, "id", None): + invocation.response_id = result.id + + status = getattr(result, "status", None) + finish_reason = _response_finish_reason(result) + if finish_reason: + invocation.finish_reasons = [finish_reason] + if status: + invocation.attributes["gen_ai.openai.response.status"] = status + + if getattr(result, "service_tier", None): + invocation.attributes[ + OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER + ] = result.service_tier + invocation.metric_attributes[ + OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER + ] = result.service_tier + + usage = getattr(result, "usage", None) + if usage: + invocation.input_tokens = _get_mapping_or_attr(usage, "input_tokens") + invocation.output_tokens = _get_mapping_or_attr(usage, "output_tokens") + input_details = _get_mapping_or_attr(usage, "input_tokens_details") + cached_tokens = _get_mapping_or_attr(input_details, "cached_tokens") + if cached_tokens is not None: + invocation.usage_cache_read_input_tokens = cached_tokens + + output_details = _get_mapping_or_attr(usage, "output_tokens_details") + reasoning_tokens = _get_mapping_or_attr( + output_details, "reasoning_tokens" + ) + if reasoning_tokens is not None: + invocation.attributes[ + "gen_ai.usage.output_tokens_details.reasoning_tokens" + ] = reasoning_tokens + + if capture_content: + invocation.output_messages = _prepare_response_output_messages(result) + return invocation + + +def _response_finish_reason(result) -> str | None: + status = getattr(result, "status", None) + if status == "completed": + for item in getattr(result, "output", []) or []: + item_type = get_property_value(item, "type") + if _is_response_tool_call_item(item_type): + return "tool_calls" + return "stop" + if status == "incomplete": + details = getattr(result, "incomplete_details", None) + reason = _get_mapping_or_attr(details, "reason") + if reason == "content_filter": + return "content_filter" + return "length" + if status in {"failed", "cancelled"}: + return "error" + return None + + +def _prepare_response_output_messages(result) -> list[OutputMessage]: + output_messages = [] + finish_reason = _response_finish_reason(result) or "stop" + for item in getattr(result, "output", []) or []: + item_type = get_property_value(item, "type") + if item_type == "message": + parts = _prepare_response_message_parts( + get_property_value(item, "content") + ) + if parts: + output_messages.append( + OutputMessage( + role=get_property_value(item, "role") or "assistant", + parts=parts, + finish_reason=finish_reason, + ) + ) + continue + + tool_call = _prepare_response_tool_call(item) + if tool_call: + output_messages.append( + OutputMessage( + role="assistant", + parts=[tool_call], + finish_reason=finish_reason, + ) + ) + return output_messages + + +def _prepare_response_tool_call(item): + item_type = get_property_value(item, "type") + if not _is_response_tool_call_item(item_type): + return None + + call_id = ( + get_property_value(item, "call_id") + or get_property_value(item, "id") + or get_property_value(item, "item_id") + ) + name = ( + get_property_value(item, "name") + or get_property_value(item, "tool_name") + or item_type + ) + arguments = get_property_value(item, "arguments") + if isinstance(arguments, str): + try: + arguments = json.loads(arguments) + except json.JSONDecodeError: + pass + return ToolCall(id=call_id, name=str(name), arguments=arguments) + + +def _is_response_tool_call_item(item_type) -> bool: + return item_type in _RESPONSE_TOOL_CALL_ITEM_TYPES + + def get_value(v: Any): if value_is_set(v): return v diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_responses.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_responses.py new file mode 100644 index 000000000..9dc6b21a6 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_responses.py @@ -0,0 +1,632 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from types import SimpleNamespace + +import pytest +from openai import AsyncOpenAI, OpenAI + +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) +from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor +from opentelemetry.semconv._incubating.attributes import ( + error_attributes as ErrorAttributes, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) +from opentelemetry.semconv._incubating.attributes import ( + openai_attributes as OpenAIAttributes, +) +from opentelemetry.trace.status import StatusCode +from opentelemetry.util.genai.environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, +) + +from .test_utils import DEFAULT_MODEL, assert_messages_attribute + +responses_resources = pytest.importorskip("openai.resources.responses") +AsyncResponses = responses_resources.AsyncResponses +Responses = responses_resources.Responses + + +def _response(): + return SimpleNamespace( + id="resp_123", + model=DEFAULT_MODEL, + status="completed", + service_tier="default", + output=[ + SimpleNamespace( + type="message", + role="assistant", + content=[ + SimpleNamespace( + type="output_text", + text="This is a Responses API test.", + ) + ], + ) + ], + usage=SimpleNamespace( + input_tokens=11, + output_tokens=7, + input_tokens_details=SimpleNamespace(cached_tokens=3), + output_tokens_details=SimpleNamespace(reasoning_tokens=2), + ), + ) + + +def _tool_response(): + return SimpleNamespace( + id="resp_123", + model=DEFAULT_MODEL, + status="completed", + output=[ + SimpleNamespace(type="reasoning", summary=[]), + SimpleNamespace( + type="function_call", + call_id="call_1", + name="lookup_weather", + arguments='{"city": "Seattle"}', + ), + ], + usage=SimpleNamespace(input_tokens=11, output_tokens=7), + ) + + +def _response_with_status(status, incomplete_reason=None): + incomplete_details = None + if incomplete_reason: + incomplete_details = SimpleNamespace(reason=incomplete_reason) + return SimpleNamespace( + id="resp_123", + model=DEFAULT_MODEL, + status=status, + output=[], + incomplete_details=incomplete_details, + usage=SimpleNamespace(input_tokens=11, output_tokens=7), + ) + + +class _RawResponse: + def __init__(self, parsed_response): + self._parsed_response = parsed_response + + def parse(self): + return self._parsed_response + + +class _ResponseStream: + def __init__(self, events): + self._events = iter(events) + self.closed = False + + def __iter__(self): + return self + + def __next__(self): + event = next(self._events) + if isinstance(event, BaseException): + raise event + return event + + def close(self): + self.closed = True + + +class _AsyncResponseStream: + def __init__(self, events): + self._events = iter(events) + self.closed = False + + def __aiter__(self): + return self + + async def __anext__(self): + try: + event = next(self._events) + except StopIteration as error: + raise StopAsyncIteration from error + if isinstance(event, BaseException): + raise event + return event + + async def close(self): + self.closed = True + + +def _instrument( + tracer_provider, + logger_provider, + meter_provider, + content_capture_mode="span_only", +): + _OpenTelemetrySemanticConventionStability._initialized = False + os.environ[OTEL_SEMCONV_STABILITY_OPT_IN] = "gen_ai_latest_experimental" + if content_capture_mode is None: + os.environ.pop( + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, None + ) + else: + os.environ[OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT] = ( + content_capture_mode + ) + instrumentor = OpenAIInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + ) + return instrumentor + + +def _cleanup(instrumentor): + os.environ.pop(OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, None) + os.environ.pop(OTEL_SEMCONV_STABILITY_OPT_IN, None) + instrumentor.uninstrument() + _OpenTelemetrySemanticConventionStability._initialized = False + + +def _assert_response_span(span): + assert span.name == f"chat {DEFAULT_MODEL}" + assert ( + span.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME] + == GenAIAttributes.GenAiOperationNameValues.CHAT.value + ) + assert span.attributes[GenAIAttributes.GEN_AI_PROVIDER_NAME] == "openai" + assert ( + span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == DEFAULT_MODEL + ) + assert span.attributes[GenAIAttributes.GEN_AI_RESPONSE_ID] == "resp_123" + assert ( + span.attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] == DEFAULT_MODEL + ) + assert span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] == 11 + assert span.attributes[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] == 7 + assert span.attributes["gen_ai.openai.response.status"] == "completed" + assert ( + span.attributes[OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER] + == "default" + ) + assert span.attributes["gen_ai.usage.cache_read.input_tokens"] == 3 + assert ( + span.attributes["gen_ai.usage.output_tokens_details.reasoning_tokens"] + == 2 + ) + assert span.attributes[GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS] == ( + "stop", + ) + + assert_messages_attribute( + span.attributes[GenAIAttributes.GEN_AI_INPUT_MESSAGES], + [ + { + "role": "user", + "parts": [{"type": "text", "content": "Say this is a test"}], + } + ], + ) + assert json.loads( + span.attributes[GenAIAttributes.GEN_AI_SYSTEM_INSTRUCTIONS] + ) == [{"type": "text", "content": "You are concise."}] + assert_messages_attribute( + span.attributes[GenAIAttributes.GEN_AI_OUTPUT_MESSAGES], + [ + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": "This is a Responses API test.", + } + ], + "finish_reason": "stop", + } + ], + ) + + +def test_responses_create_with_content( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_create(self, **kwargs): + return _response() + + monkeypatch.setattr(Responses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + getattr(OpenAI(), "responses").create( + model=DEFAULT_MODEL, + instructions="You are concise.", + input=[ + { + "type": "message", + "role": "user", + "content": [ + { + "type": "input_text", + "text": "Say this is a test", + } + ], + } + ], + service_tier="default", + reasoning={"effort": "low", "summary": "auto"}, + ) + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + _assert_response_span(span) + assert span.attributes["gen_ai.openai.request.reasoning.effort"] == "low" + assert span.attributes["gen_ai.openai.request.reasoning.summary"] == "auto" + + +@pytest.mark.asyncio +async def test_async_responses_create_with_content( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + async def fake_create(self, **kwargs): + return _response() + + monkeypatch.setattr(AsyncResponses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + await getattr(AsyncOpenAI(), "responses").create( + model=DEFAULT_MODEL, + instructions="You are concise.", + input="Say this is a test", + ) + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + _assert_response_span(span) + + +def test_responses_create_with_raw_response( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_create(self, **kwargs): + return _RawResponse(_response()) + + monkeypatch.setattr(Responses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + raw_response = getattr(OpenAI(), "responses").create( + model=DEFAULT_MODEL, + instructions="You are concise.", + input="Say this is a test", + ) + finally: + _cleanup(instrumentor) + + assert isinstance(raw_response, _RawResponse) + (span,) = span_exporter.get_finished_spans() + _assert_response_span(span) + + +@pytest.mark.parametrize( + ("status", "incomplete_reason", "finish_reason"), + [ + ("incomplete", None, "length"), + ("incomplete", "content_filter", "content_filter"), + ("failed", None, "error"), + ("cancelled", None, "error"), + ], +) +def test_responses_create_finish_reason_status_mapping( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, + status, + incomplete_reason, + finish_reason, +): + def fake_create(self, **kwargs): + return _response_with_status(status, incomplete_reason) + + monkeypatch.setattr(Responses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + getattr(OpenAI(), "responses").create( + model=DEFAULT_MODEL, + input="Say this is a test", + ) + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + assert span.attributes[GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS] == ( + finish_reason, + ) + + +@pytest.mark.asyncio +async def test_async_responses_create_streaming( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + async def fake_create(self, **kwargs): + return _AsyncResponseStream( + [ + SimpleNamespace(type="response.created"), + SimpleNamespace( + type="response.completed", response=_response() + ), + ] + ) + + monkeypatch.setattr(AsyncResponses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + stream = await getattr(AsyncOpenAI(), "responses").create( + model=DEFAULT_MODEL, + instructions="You are concise.", + input="Say this is a test", + stream=True, + ) + async for _ in stream: + pass + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + _assert_response_span(span) + + +def test_responses_create_streaming_error( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_create(self, **kwargs): + return _ResponseStream( + [ + SimpleNamespace( + type="response.created", + response=SimpleNamespace( + id="resp_started", + model=DEFAULT_MODEL, + status="in_progress", + ), + ), + RuntimeError("stream failed"), + ] + ) + + monkeypatch.setattr(Responses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + stream = getattr(OpenAI(), "responses").create( + model=DEFAULT_MODEL, + input="Say this is a test", + stream=True, + ) + with pytest.raises(RuntimeError, match="stream failed"): + for _ in stream: + pass + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + assert span.status.status_code == StatusCode.ERROR + assert span.attributes[ErrorAttributes.ERROR_TYPE] == "RuntimeError" + assert ( + span.attributes[GenAIAttributes.GEN_AI_RESPONSE_ID] == "resp_started" + ) + assert ( + span.attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] == DEFAULT_MODEL + ) + + +def test_responses_create_error( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_create(self, **kwargs): + raise RuntimeError("responses failed") + + monkeypatch.setattr(Responses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + with pytest.raises(RuntimeError, match="responses failed"): + getattr(OpenAI(), "responses").create( + model=DEFAULT_MODEL, + input="Say this is a test", + ) + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + assert span.status.status_code == StatusCode.ERROR + assert span.attributes[ErrorAttributes.ERROR_TYPE] == "RuntimeError" + assert ( + span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == DEFAULT_MODEL + ) + + +def test_responses_create_no_content( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_create(self, **kwargs): + return _response() + + monkeypatch.setattr(Responses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, + logger_provider, + meter_provider, + content_capture_mode=None, + ) + try: + getattr(OpenAI(), "responses").create( + model=DEFAULT_MODEL, + instructions="You are concise.", + input="Say this is a test", + ) + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + assert GenAIAttributes.GEN_AI_INPUT_MESSAGES not in span.attributes + assert GenAIAttributes.GEN_AI_OUTPUT_MESSAGES not in span.attributes + assert GenAIAttributes.GEN_AI_SYSTEM_INSTRUCTIONS not in span.attributes + + +def test_responses_create_tool_call_skips_reasoning_items( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_create(self, **kwargs): + return _tool_response() + + monkeypatch.setattr(Responses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + getattr(OpenAI(), "responses").create( + model=DEFAULT_MODEL, + input="Say this is a test", + tools=[ + { + "type": "function", + "name": "lookup_weather", + "description": "Get weather.", + "parameters": {"type": "object"}, + } + ], + ) + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + assert span.attributes[GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS] == ( + "tool_calls", + ) + output_messages = json.loads( + span.attributes[GenAIAttributes.GEN_AI_OUTPUT_MESSAGES] + ) + assert len(output_messages) == 1 + assert_messages_attribute( + span.attributes[GenAIAttributes.GEN_AI_OUTPUT_MESSAGES], + [ + { + "role": "assistant", + "parts": [ + { + "type": "tool_call", + "id": "call_1", + "name": "lookup_weather", + "arguments": {"city": "Seattle"}, + } + ], + "finish_reason": "tool_calls", + } + ], + ) + assert json.loads(span.attributes["gen_ai.tool.definitions"]) == [ + { + "name": "lookup_weather", + "description": "Get weather.", + "parameters": {"type": "object"}, + "type": "function", + } + ] + + +def test_responses_create_streaming( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_create(self, **kwargs): + return _ResponseStream( + [ + SimpleNamespace(type="response.created"), + SimpleNamespace( + type="response.completed", response=_response() + ), + ] + ) + + monkeypatch.setattr(Responses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + stream = getattr(OpenAI(), "responses").create( + model=DEFAULT_MODEL, + instructions="You are concise.", + input="Say this is a test", + stream=True, + ) + for _ in stream: + pass + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + _assert_response_span(span) From eb58e0e47e3318cc62c1896b32ffd72367e36f20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=81=E5=B1=BF?= Date: Wed, 3 Jun 2026 19:47:25 +0800 Subject: [PATCH 2/2] fix: cover OpenAI Responses helper instrumentation --- .../instrumentation/openai_v2/__init__.py | 36 ++ .../instrumentation/openai_v2/patch.py | 158 ++++++++ .../instrumentation/openai_v2/utils.py | 11 +- .../tests/test_responses.py | 343 ++++++++++++++++-- 4 files changed, 522 insertions(+), 26 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py index e363571a9..ab554bce2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py @@ -69,10 +69,14 @@ async_chat_completions_create_v_old, async_embeddings_create, async_responses_create_v_new, + async_responses_parse_v_new, + async_responses_retrieve_v_new, chat_completions_create_v_new, chat_completions_create_v_old, embeddings_create, responses_create_v_new, + responses_parse_v_new, + responses_retrieve_v_new, ) _logger = logging.getLogger(__name__) @@ -199,12 +203,32 @@ def _instrument(self, **kwargs): name="Responses.create", wrapper=responses_create_v_new(handler, content_mode), ) + _wrap_function_wrapper_if_available( + module="openai.resources.responses", + name="Responses.parse", + wrapper=responses_parse_v_new(handler, content_mode), + ) + _wrap_function_wrapper_if_available( + module="openai.resources.responses", + name="Responses.retrieve", + wrapper=responses_retrieve_v_new(handler, content_mode), + ) _wrap_function_wrapper_if_available( module="openai.resources.responses", name="AsyncResponses.create", wrapper=async_responses_create_v_new(handler, content_mode), ) + _wrap_function_wrapper_if_available( + module="openai.resources.responses", + name="AsyncResponses.parse", + wrapper=async_responses_parse_v_new(handler, content_mode), + ) + _wrap_function_wrapper_if_available( + module="openai.resources.responses", + name="AsyncResponses.retrieve", + wrapper=async_responses_retrieve_v_new(handler, content_mode), + ) def _uninstrument(self, **kwargs): import openai # pylint: disable=import-outside-toplevel # noqa: PLC0415 @@ -216,6 +240,18 @@ def _uninstrument(self, **kwargs): _unwrap_if_available( "openai.resources.responses", "Responses", "create" ) + _unwrap_if_available( + "openai.resources.responses", "Responses", "parse" + ) + _unwrap_if_available( + "openai.resources.responses", "Responses", "retrieve" + ) _unwrap_if_available( "openai.resources.responses", "AsyncResponses", "create" ) + _unwrap_if_available( + "openai.resources.responses", "AsyncResponses", "parse" + ) + _unwrap_if_available( + "openai.resources.responses", "AsyncResponses", "retrieve" + ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index 623fef4ec..df1fd73cf 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -309,6 +309,89 @@ def traced_method(wrapped, instance, args, kwargs): return traced_method +def responses_parse_v_new( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + def traced_method(wrapped, instance, args, kwargs): + response_invocation = handler.start_llm( + create_response_invocation( + kwargs, instance, capture_content=capture_content + ) + ) + try: + result = wrapped(*args, **kwargs) + if is_streaming(kwargs): + return ResponsesStreamWrapper( + result, + handler, + response_invocation, + capture_content, + ) + + set_response_invocation_properties( + response_invocation, result, capture_content + ) + handler.stop_llm(response_invocation) + return result + except Exception as error: + handler.fail_llm( + response_invocation, + Error(type=type(error), message=str(error)), + ) + raise + + return traced_method + + +def responses_retrieve_v_new( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + def traced_method(wrapped, instance, args, kwargs): + if not is_streaming(kwargs): + return wrapped(*args, **kwargs) + + response_invocation = handler.start_llm( + create_response_invocation( + { + "previous_response_id": _get_response_id(args, kwargs), + "stream": kwargs.get("stream"), + }, + instance, + capture_content=capture_content, + ) + ) + try: + result = wrapped(*args, **kwargs) + return ResponsesStreamWrapper( + result, + handler, + response_invocation, + capture_content, + ) + except Exception as error: + handler.fail_llm( + response_invocation, + Error(type=type(error), message=str(error)), + ) + raise + + return traced_method + + +def _get_response_id(args, kwargs): + if "response_id" in kwargs: + return kwargs["response_id"] + if args: + return args[0] + return None + + def async_responses_create_v_new( handler: TelemetryHandler, content_capturing_mode: ContentCapturingMode, @@ -350,6 +433,81 @@ async def traced_method(wrapped, instance, args, kwargs): return traced_method +def async_responses_parse_v_new( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + async def traced_method(wrapped, instance, args, kwargs): + response_invocation = handler.start_llm( + create_response_invocation( + kwargs, instance, capture_content=capture_content + ) + ) + try: + result = await wrapped(*args, **kwargs) + if is_streaming(kwargs): + return ResponsesStreamWrapper( + result, + handler, + response_invocation, + capture_content, + ) + + set_response_invocation_properties( + response_invocation, result, capture_content + ) + handler.stop_llm(response_invocation) + return result + except Exception as error: + handler.fail_llm( + response_invocation, + Error(type=type(error), message=str(error)), + ) + raise + + return traced_method + + +def async_responses_retrieve_v_new( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + async def traced_method(wrapped, instance, args, kwargs): + if not is_streaming(kwargs): + return await wrapped(*args, **kwargs) + + response_invocation = handler.start_llm( + create_response_invocation( + { + "previous_response_id": _get_response_id(args, kwargs), + "stream": kwargs.get("stream"), + }, + instance, + capture_content=capture_content, + ) + ) + try: + result = await wrapped(*args, **kwargs) + return ResponsesStreamWrapper( + result, + handler, + response_invocation, + capture_content, + ) + except Exception as error: + handler.fail_llm( + response_invocation, + Error(type=type(error), message=str(error)), + ) + raise + + return traced_method + + def embeddings_create( tracer: Tracer, instruments: Instruments, diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py index d30780fd3..ad7090962 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py @@ -21,6 +21,7 @@ from httpx import URL from openai import NotGiven +from openai._types import Omit from opentelemetry._logs import LogRecord from opentelemetry.semconv._incubating.attributes import ( @@ -219,7 +220,7 @@ def non_numerical_value_is_set(value: bool | str | NotGiven | None): def value_is_set(value): - return value is not None and not isinstance(value, NotGiven) + return value is not None and not isinstance(value, (NotGiven, Omit)) def get_llm_request_attributes( @@ -488,10 +489,8 @@ def create_response_invocation( _get_mapping_or_attr(reasoning, "summary"), ) - if attributes: - llm_invocation.attributes = attributes - if metric_attributes: - llm_invocation.metric_attributes = metric_attributes + llm_invocation.attributes = attributes + llm_invocation.metric_attributes = metric_attributes if capture_content: llm_invocation.input_messages = _prepare_response_input_messages( @@ -625,6 +624,8 @@ def set_response_invocation_properties( result, capture_content: bool, ) -> LLMInvocation: + if not invocation.request_model and getattr(result, "model", None): + invocation.request_model = result.model if getattr(result, "model", None): invocation.response_model_name = result.model if getattr(result, "id", None): diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_responses.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_responses.py index 9dc6b21a6..e6d40dc8c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_responses.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_responses.py @@ -45,65 +45,124 @@ Responses = responses_resources.Responses +def _to_dict(value): + if isinstance(value, SimpleNamespace): + return { + key: _to_dict(item) + for key, item in vars(value).items() + if not key.startswith("_") + } + if isinstance(value, list): + return [_to_dict(item) for item in value] + if isinstance(value, tuple): + return tuple(_to_dict(item) for item in value) + if isinstance(value, dict): + return {key: _to_dict(item) for key, item in value.items()} + return value + + +class _ResponseObject(SimpleNamespace): + def to_dict(self): + return _to_dict(self) + + +def _obj(**kwargs): + return _ResponseObject(**kwargs) + + def _response(): - return SimpleNamespace( + return _obj( id="resp_123", model=DEFAULT_MODEL, status="completed", service_tier="default", output=[ - SimpleNamespace( + _obj( type="message", role="assistant", content=[ - SimpleNamespace( + _obj( type="output_text", text="This is a Responses API test.", ) ], ) ], - usage=SimpleNamespace( + usage=_obj( input_tokens=11, output_tokens=7, - input_tokens_details=SimpleNamespace(cached_tokens=3), - output_tokens_details=SimpleNamespace(reasoning_tokens=2), + input_tokens_details=_obj(cached_tokens=3), + output_tokens_details=_obj(reasoning_tokens=2), ), ) def _tool_response(): - return SimpleNamespace( + return _obj( id="resp_123", model=DEFAULT_MODEL, status="completed", output=[ - SimpleNamespace(type="reasoning", summary=[]), - SimpleNamespace( + _obj(type="reasoning", summary=[]), + _obj( type="function_call", call_id="call_1", name="lookup_weather", arguments='{"city": "Seattle"}', ), ], - usage=SimpleNamespace(input_tokens=11, output_tokens=7), + usage=_obj(input_tokens=11, output_tokens=7), ) def _response_with_status(status, incomplete_reason=None): incomplete_details = None if incomplete_reason: - incomplete_details = SimpleNamespace(reason=incomplete_reason) - return SimpleNamespace( + incomplete_details = _obj(reason=incomplete_reason) + return _obj( id="resp_123", model=DEFAULT_MODEL, status=status, output=[], incomplete_details=incomplete_details, - usage=SimpleNamespace(input_tokens=11, output_tokens=7), + usage=_obj(input_tokens=11, output_tokens=7), + ) + + +def _created_response(): + return _obj( + id="resp_123", + model=DEFAULT_MODEL, + status="in_progress", + output=[], + usage=None, + ) + + +def _response_created_event(sequence_number=0): + return SimpleNamespace( + type="response.created", + sequence_number=sequence_number, + response=_created_response(), ) +def _response_completed_event(sequence_number=1): + return SimpleNamespace( + type="response.completed", + sequence_number=sequence_number, + response=_response(), + ) + + +class _RawStreamResponse: + def close(self): + pass + + async def aclose(self): + pass + + class _RawResponse: def __init__(self, parsed_response): self._parsed_response = parsed_response @@ -116,6 +175,7 @@ class _ResponseStream: def __init__(self, events): self._events = iter(events) self.closed = False + self.response = _RawStreamResponse() def __iter__(self): return self @@ -134,6 +194,7 @@ class _AsyncResponseStream: def __init__(self, events): self._events = iter(events) self.closed = False + self.response = _RawStreamResponse() def __aiter__(self): return self @@ -391,10 +452,8 @@ async def test_async_responses_create_streaming( async def fake_create(self, **kwargs): return _AsyncResponseStream( [ - SimpleNamespace(type="response.created"), - SimpleNamespace( - type="response.completed", response=_response() - ), + _response_created_event(), + _response_completed_event(), ] ) @@ -605,10 +664,8 @@ def test_responses_create_streaming( def fake_create(self, **kwargs): return _ResponseStream( [ - SimpleNamespace(type="response.created"), - SimpleNamespace( - type="response.completed", response=_response() - ), + _response_created_event(), + _response_completed_event(), ] ) @@ -630,3 +687,247 @@ def fake_create(self, **kwargs): (span,) = span_exporter.get_finished_spans() _assert_response_span(span) + + +def test_responses_stream_helper_filters_omit_sentinels( + monkeypatch, + caplog, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_create(self, **kwargs): + return _ResponseStream( + [ + _response_created_event(), + _response_completed_event(), + ] + ) + + monkeypatch.setattr(Responses, "create", fake_create) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + with caplog.at_level("WARNING"): + with getattr(OpenAI(), "responses").stream( + model=DEFAULT_MODEL, + input="Say this is a test", + ) as stream: + for _ in stream: + pass + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + assert ( + span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == DEFAULT_MODEL + ) + assert "gen_ai.request.temperature" not in span.attributes + assert "gen_ai.request.top_p" not in span.attributes + assert "gen_ai.openai.request.previous_response_id" not in span.attributes + assert "gen_ai.openai.request.background" not in span.attributes + assert "gen_ai.openai.request.store" not in span.attributes + assert "gen_ai.openai.request.parallel_tool_calls" not in span.attributes + assert not any( + "Omit" in record.message or "Invalid type" in record.message + for record in caplog.records + ) + + +def test_responses_stream_existing_response_uses_retrieve( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_retrieve(self, response_id, **kwargs): + assert response_id == "resp_existing" + assert kwargs["stream"] is True + return _ResponseStream( + [ + _response_created_event(), + _response_completed_event(), + ] + ) + + monkeypatch.setattr(Responses, "retrieve", fake_retrieve) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + with getattr(OpenAI(), "responses").stream( + response_id="resp_existing" + ) as stream: + for _ in stream: + pass + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + assert span.name == f"chat {DEFAULT_MODEL}" + assert ( + span.attributes["gen_ai.openai.request.previous_response_id"] + == "resp_existing" + ) + assert ( + span.attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] == DEFAULT_MODEL + ) + assert span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] == 11 + assert span.attributes[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] == 7 + + +def test_responses_retrieve_non_streaming_does_not_create_span( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_retrieve(self, response_id, **kwargs): + assert response_id == "resp_existing" + assert "stream" not in kwargs + return _response() + + monkeypatch.setattr(Responses, "retrieve", fake_retrieve) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + result = getattr(OpenAI(), "responses").retrieve("resp_existing") + finally: + _cleanup(instrumentor) + + assert result.id == "resp_123" + assert not span_exporter.get_finished_spans() + + +def test_responses_parse_with_content( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + def fake_parse(self, **kwargs): + return _response() + + monkeypatch.setattr(Responses, "parse", fake_parse) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + parsed = getattr(OpenAI(), "responses").parse( + model=DEFAULT_MODEL, + instructions="You are concise.", + input="Say this is a test", + ) + finally: + _cleanup(instrumentor) + + assert parsed.id == "resp_123" + (span,) = span_exporter.get_finished_spans() + _assert_response_span(span) + + +@pytest.mark.asyncio +async def test_async_responses_parse_with_content( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + async def fake_parse(self, **kwargs): + return _response() + + monkeypatch.setattr(AsyncResponses, "parse", fake_parse) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + parsed = await getattr(AsyncOpenAI(), "responses").parse( + model=DEFAULT_MODEL, + instructions="You are concise.", + input="Say this is a test", + ) + finally: + _cleanup(instrumentor) + + assert parsed.id == "resp_123" + (span,) = span_exporter.get_finished_spans() + _assert_response_span(span) + + +@pytest.mark.asyncio +async def test_async_responses_stream_existing_response_uses_retrieve( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + async def fake_retrieve(self, response_id, **kwargs): + assert response_id == "resp_existing" + assert kwargs["stream"] is True + return _AsyncResponseStream( + [ + _response_created_event(), + _response_completed_event(), + ] + ) + + monkeypatch.setattr(AsyncResponses, "retrieve", fake_retrieve) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + async with getattr(AsyncOpenAI(), "responses").stream( + response_id="resp_existing" + ) as stream: + async for _ in stream: + pass + finally: + _cleanup(instrumentor) + + (span,) = span_exporter.get_finished_spans() + assert span.name == f"chat {DEFAULT_MODEL}" + assert ( + span.attributes["gen_ai.openai.request.previous_response_id"] + == "resp_existing" + ) + assert ( + span.attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] == DEFAULT_MODEL + ) + assert span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] == 11 + assert span.attributes[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] == 7 + + +@pytest.mark.asyncio +async def test_async_responses_retrieve_non_streaming_does_not_create_span( + monkeypatch, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, +): + async def fake_retrieve(self, response_id, **kwargs): + assert response_id == "resp_existing" + assert "stream" not in kwargs + return _response() + + monkeypatch.setattr(AsyncResponses, "retrieve", fake_retrieve) + instrumentor = _instrument( + tracer_provider, logger_provider, meter_provider + ) + try: + result = await getattr(AsyncOpenAI(), "responses").retrieve( + "resp_existing" + ) + finally: + _cleanup(instrumentor) + + assert result.id == "resp_123" + assert not span_exporter.get_finished_spans()