diff --git a/instrumentation/opentelemetry-instrumentation-genai-openai/.changelog/13.added b/instrumentation/opentelemetry-instrumentation-genai-openai/.changelog/13.added new file mode 100644 index 00000000..da7d2c64 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-genai-openai/.changelog/13.added @@ -0,0 +1 @@ +Record `gen_ai.client.operation.time_to_first_chunk` and `gen_ai.client.operation.time_per_output_chunk` metrics for chat completion streams. \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-genai-openai/src/opentelemetry/instrumentation/genai/openai/chat_wrappers.py b/instrumentation/opentelemetry-instrumentation-genai-openai/src/opentelemetry/instrumentation/genai/openai/chat_wrappers.py index 4c5dfa32..981eaa0f 100644 --- a/instrumentation/opentelemetry-instrumentation-genai-openai/src/opentelemetry/instrumentation/genai/openai/chat_wrappers.py +++ b/instrumentation/opentelemetry-instrumentation-genai-openai/src/opentelemetry/instrumentation/genai/openai/chat_wrappers.py @@ -182,7 +182,11 @@ def __init__( invocation: InferenceInvocation, capture_content: bool, ) -> None: - super().__init__(stream) + super().__init__( + stream, + start_time_s=invocation.monotonic_start_s, + timing_target=invocation, + ) self._self_invocation = invocation self._self_choice_buffers = [] self._self_capture_content = capture_content @@ -203,7 +207,11 @@ def __init__( invocation: InferenceInvocation, capture_content: bool, ) -> None: - super().__init__(stream) + super().__init__( + stream, + start_time_s=invocation.monotonic_start_s, + timing_target=invocation, + ) self._self_invocation = invocation self._self_choice_buffers = [] self._self_capture_content = capture_content diff --git a/instrumentation/opentelemetry-instrumentation-genai-openai/tests/test_chat_metrics.py b/instrumentation/opentelemetry-instrumentation-genai-openai/tests/test_chat_metrics.py index 15826ac4..b71fa4b8 100644 --- a/instrumentation/opentelemetry-instrumentation-genai-openai/tests/test_chat_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-genai-openai/tests/test_chat_metrics.py @@ -254,3 +254,172 @@ async def test_async_chat_completion_metrics( assert_all_metric_attributes( output_token_usage, latest_experimental_enabled ) + + +# TTFC and per-output-chunk histograms have a different attribute shape than +# the duration/token histograms: streaming responses in the recorded cassettes +# do not include system_fingerprint or service_tier, so we assert only the +# core gen_ai.* attributes that should always be populated. +def assert_streaming_metric_attributes( + data_point, latest_experimental_enabled, expected_request_model +): + assert GenAIAttributes.GEN_AI_OPERATION_NAME in data_point.attributes + assert ( + data_point.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME] + == GenAIAttributes.GenAiOperationNameValues.CHAT.value + ) + + provider_name_attr_name = ( + "gen_ai.provider.name" + if latest_experimental_enabled + else GenAIAttributes.GEN_AI_SYSTEM + ) + assert provider_name_attr_name in data_point.attributes + assert ( + data_point.attributes[provider_name_attr_name] + == GenAIAttributes.GenAiSystemValues.OPENAI.value + ) + + assert GenAIAttributes.GEN_AI_REQUEST_MODEL in data_point.attributes + assert ( + data_point.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] + == expected_request_model + ) + assert GenAIAttributes.GEN_AI_RESPONSE_MODEL in data_point.attributes + assert data_point.attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] + + assert ( + data_point.attributes[ServerAttributes.SERVER_ADDRESS] + == "api.openai.com" + ) + + +def test_chat_completion_streaming_metrics( + metric_reader, openai_client, instrument_with_content, vcr +): + """Regression test for the openai_v2 sync chat stream wrapper wiring. + + Exercises the actual ChatStreamWrapper path so that removing + timing_target=invocation in chat_wrappers.py would cause this test to + fail, not just the util-layer tests. + """ + if not is_experimental_mode(): + pytest.skip("new stream wrapper only") + + latest_experimental_enabled = is_experimental_mode() + request_model = "gpt-4" + + with vcr.use_cassette("test_chat_completion_streaming.yaml"): + response = openai_client.chat.completions.create( + messages=USER_ONLY_PROMPT, + model=request_model, + stream=True, + stream_options={"include_usage": True}, + ) + for _ in response: + pass + + metrics = metric_reader.get_metrics_data().resource_metrics + assert len(metrics) == 1 + metric_data = metrics[0].scope_metrics[0].metrics + + ttfc_metric = next( + ( + m + for m in metric_data + if m.name == "gen_ai.client.operation.time_to_first_chunk" + ), + None, + ) + assert ttfc_metric is not None + assert len(ttfc_metric.data.data_points) == 1 + ttfc_point = ttfc_metric.data.data_points[0] + assert ttfc_point.count == 1 + assert ttfc_point.sum >= 0 + assert_streaming_metric_attributes( + ttfc_point, latest_experimental_enabled, request_model + ) + + chunk_metric = next( + ( + m + for m in metric_data + if m.name == "gen_ai.client.operation.time_per_output_chunk" + ), + None, + ) + assert chunk_metric is not None + assert len(chunk_metric.data.data_points) == 1 + chunk_point = chunk_metric.data.data_points[0] + assert chunk_point.count >= 1 + assert chunk_point.sum >= 0 + assert_streaming_metric_attributes( + chunk_point, latest_experimental_enabled, request_model + ) + + +@pytest.mark.asyncio() +async def test_async_chat_completion_streaming_metrics( + metric_reader, async_openai_client, instrument_with_content, vcr +): + """Regression test for the openai_v2 async chat stream wrapper wiring. + + The async path has separate __init__ wiring from the sync path in + chat_wrappers.py, so it needs its own coverage. Removing + timing_target=invocation in AsyncChatStreamWrapper would still pass + every util-layer test, but would silently break TTFC and per-output-chunk + metrics for async OpenAI streaming. + """ + if not is_experimental_mode(): + pytest.skip("new stream wrapper only") + + latest_experimental_enabled = is_experimental_mode() + request_model = "gpt-4" + + with vcr.use_cassette("test_async_chat_completion_streaming.yaml"): + response = await async_openai_client.chat.completions.create( + messages=USER_ONLY_PROMPT, + model=request_model, + stream=True, + stream_options={"include_usage": True}, + ) + async for _ in response: + pass + + metrics = metric_reader.get_metrics_data().resource_metrics + assert len(metrics) == 1 + metric_data = metrics[0].scope_metrics[0].metrics + + ttfc_metric = next( + ( + m + for m in metric_data + if m.name == "gen_ai.client.operation.time_to_first_chunk" + ), + None, + ) + assert ttfc_metric is not None + assert len(ttfc_metric.data.data_points) == 1 + ttfc_point = ttfc_metric.data.data_points[0] + assert ttfc_point.count == 1 + assert ttfc_point.sum >= 0 + assert_streaming_metric_attributes( + ttfc_point, latest_experimental_enabled, request_model + ) + + chunk_metric = next( + ( + m + for m in metric_data + if m.name == "gen_ai.client.operation.time_per_output_chunk" + ), + None, + ) + assert chunk_metric is not None + assert len(chunk_metric.data.data_points) == 1 + chunk_point = chunk_metric.data.data_points[0] + assert chunk_point.count >= 1 + assert chunk_point.sum >= 0 + assert_streaming_metric_attributes( + chunk_point, latest_experimental_enabled, request_model + ) diff --git a/util/opentelemetry-util-genai/.changelog/13.added b/util/opentelemetry-util-genai/.changelog/13.added new file mode 100644 index 00000000..81302fdc --- /dev/null +++ b/util/opentelemetry-util-genai/.changelog/13.added @@ -0,0 +1 @@ +Record streaming `gen_ai.client.operation.time_to_first_chunk` and `gen_ai.client.operation.time_per_output_chunk` histograms on inference invocations, and expose a `_TimingTarget` protocol on the shared stream wrappers so per-chunk gaps are pushed inline rather than buffered in the wrapper. diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py index e05b21ab..73b70d58 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py @@ -4,7 +4,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Any +from typing import TYPE_CHECKING, Any from opentelemetry._logs import Logger, LogRecord from opentelemetry.semconv._incubating.attributes import ( @@ -18,7 +18,6 @@ get_content_attributes, ) from opentelemetry.util.genai.completion_hook import CompletionHook -from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.types import ( InputMessage, MessagePart, @@ -30,8 +29,12 @@ should_emit_event, ) +if TYPE_CHECKING: + from opentelemetry.util.genai.metrics import InvocationMetricsRecorder + # TODO: Migrate to GenAI constants once available in semconv package _GEN_AI_REASONING_OUTPUT_TOKENS = "gen_ai.usage.reasoning.output_tokens" +_GEN_AI_RESPONSE_TIME_TO_FIRST_CHUNK = "gen_ai.response.time_to_first_chunk" class InferenceInvocation(GenAIInvocation): @@ -93,6 +96,9 @@ def __init__( self.cache_creation_input_tokens: int | None = None self.cache_read_input_tokens: int | None = None self.tool_definitions: list[ToolDefinition] | None = None + # Streaming timing fields (populated by stream wrappers) + self.ttfc_seconds: float | None = None + self.chunk_gap_seconds: list[float] = [] self._start(self._get_base_attributes()) def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]: @@ -161,6 +167,10 @@ def _get_attributes(self) -> dict[str, Any]: _GEN_AI_REASONING_OUTPUT_TOKENS, self.thinking_tokens, ), + ( + _GEN_AI_RESPONSE_TIME_TO_FIRST_CHUNK, + self.ttfc_seconds, + ), ) attrs.update({k: v for k, v in optional_attrs if v is not None}) return attrs @@ -172,6 +182,29 @@ def _get_metric_attributes(self) -> dict[str, Any]: attrs.update(self.metric_attributes) return attrs + def _record_chunk_gap(self, gap: float) -> None: + """Buffer a time-per-output-chunk gap (in seconds). + + Called by the stream wrapper as each chunk after the first arrives. + Buffered gaps are drained by ``_consume_streaming_timing`` when the + invocation stops. + """ + self.chunk_gap_seconds.append(gap) + + def _consume_streaming_timing( + self, + ) -> tuple[float | None, list[float]]: + """Return TTFC and chunk gaps, then reset them on the invocation. + + Called by InvocationMetricsRecorder so the timing values are emitted + once and not held past finalization. + """ + ttfc = self.ttfc_seconds + gaps = self.chunk_gap_seconds + self.ttfc_seconds = None + self.chunk_gap_seconds = [] + return ttfc, gaps + def _get_metric_token_counts(self) -> dict[str, int]: counts: dict[str, int] = {} if self.input_tokens is not None: diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py index f58c3852..ec52453c 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py @@ -87,6 +87,17 @@ def __init__( self._context_token: ContextToken | None = None self._monotonic_start_s: float | None = None + @property + def monotonic_start_s(self) -> float | None: + """Monotonic timestamp (seconds) when this invocation started. + + This timestamp is the anchor for the streaming time-to-first-chunk + (TTFC) metric. Instrumentations MUST NOT perform meaningful work + between ``start_inference()`` and the wrapped SDK call: anything + heavier than building attribute dicts will silently inflate TTFC. + """ + return self._monotonic_start_s + def _start(self, attributes: dict[str, Any] | None = None) -> None: """Start the invocation span and attach it to the current context. diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py index b79d4472..77e64e5e 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py @@ -4,6 +4,14 @@ from opentelemetry.metrics import Histogram, Meter from opentelemetry.semconv._incubating.metrics import gen_ai_metrics +# TODO: Migrate to GenAI constants once available in semconv package +_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK = ( + "gen_ai.client.operation.time_to_first_chunk" +) +_GEN_AI_CLIENT_OPERATION_TIME_PER_OUTPUT_CHUNK = ( + "gen_ai.client.operation.time_per_output_chunk" +) + _GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ 0.01, 0.02, @@ -55,3 +63,29 @@ def create_token_histogram(meter: Meter) -> Histogram: unit="{token}", explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, ) + + +def create_ttfc_histogram(meter: Meter) -> Histogram: + return meter.create_histogram( + name=_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK, + description=( + "Time to receive the first chunk, measured from when the client " + "issues the generation request to when the first chunk is " + "received in the response stream." + ), + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, + ) + + +def create_time_per_chunk_histogram(meter: Meter) -> Histogram: + return meter.create_histogram( + name=_GEN_AI_CLIENT_OPERATION_TIME_PER_OUTPUT_CHUNK, + description=( + "Time per output chunk, recorded for each chunk received after " + "the first one, measured as the time elapsed from the end of " + "the previous chunk to the end of the current chunk." + ), + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py index 7d0ad944..814ad00d 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py @@ -14,9 +14,12 @@ ) from opentelemetry.util.genai.instruments import ( create_duration_histogram, + create_time_per_chunk_histogram, create_token_histogram, + create_ttfc_histogram, ) +from ._inference_invocation import InferenceInvocation from ._invocation import GenAIInvocation @@ -26,6 +29,10 @@ class InvocationMetricsRecorder: def __init__(self, meter: Meter): self._duration_histogram: Histogram = create_duration_histogram(meter) self._token_histogram: Histogram = create_token_histogram(meter) + self._ttfc_histogram: Histogram = create_ttfc_histogram(meter) + self._time_per_chunk_histogram: Histogram = ( + create_time_per_chunk_histogram(meter) + ) def record(self, invocation: GenAIInvocation) -> None: """Record duration and token metrics for an invocation if possible.""" @@ -33,9 +40,9 @@ def record(self, invocation: GenAIInvocation) -> None: token_counts = invocation._get_metric_token_counts() duration_seconds: Optional[float] = None - if invocation._monotonic_start_s is not None: + if invocation.monotonic_start_s is not None: duration_seconds = max( - timeit.default_timer() - invocation._monotonic_start_s, + timeit.default_timer() - invocation.monotonic_start_s, 0.0, ) @@ -53,5 +60,21 @@ def record(self, invocation: GenAIInvocation) -> None: context=invocation._span_context, ) + # Streaming timing metrics + if isinstance(invocation, InferenceInvocation): + ttfc, gaps = invocation._consume_streaming_timing() + if ttfc is not None: + self._ttfc_histogram.record( + ttfc, + attributes=attributes, + context=invocation._span_context, + ) + for gap in gaps: + self._time_per_chunk_histogram.record( + gap, + attributes=attributes, + context=invocation._span_context, + ) + __all__ = ["InvocationMetricsRecorder"] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py index 88181146..b0c918b4 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py @@ -4,6 +4,7 @@ from __future__ import annotations import logging +import timeit from abc import ABCMeta, abstractmethod from types import TracebackType from typing import ( @@ -30,6 +31,22 @@ def __init__(self, wrapped: object) -> None: ... _logger = logging.getLogger(__name__) +class _TimingTarget(Protocol): + """Internal protocol for objects that receive streaming timing. + + This is the wrapper-to-invocation seam inside this package and is not + a public extension point; third parties should not implement it + directly. ``ttfc_seconds`` is set once by the wrapper at + finalization. Each inter-chunk gap is pushed through + ``_record_chunk_gap`` as soon as it is measured so the wrapper does + not need to buffer measurements. + """ + + ttfc_seconds: float | None + + def _record_chunk_gap(self, gap: float) -> None: ... + + class _StreamWrapperMeta(ABCMeta, type(_ObjectProxy)): """Metaclass compatible with wrapt's proxy type and ABC hooks.""" @@ -64,11 +81,25 @@ class SyncStreamWrapper( internally by the wrapper lifecycle and are not part of the public API. """ - def __init__(self, stream: _SyncStream[ChunkT]): + def __init__( + self, + stream: _SyncStream[ChunkT], + start_time_s: float | None = None, + timing_target: _TimingTarget | None = None, + ) -> None: + if timing_target is not None and start_time_s is None: + raise ValueError( + "start_time_s is required when timing_target is provided" + ) super().__init__(stream) self._self_stream = stream self._self_iterator = iter(stream) self._self_finalized = False + self._self_start_time_s = start_time_s + self._self_timing_target = timing_target + self._self_ttfc_seconds: float | None = None + self._self_first_chunk_at: float | None = None + self._self_last_chunk_at: float | None = None def __enter__(self): return self @@ -116,22 +147,57 @@ def __next__(self) -> ChunkT: except Exception as error: self._safe_finalize_failure(error) raise + + if ( + self._self_start_time_s is not None + or self._self_timing_target is not None + ): + t_after_read = timeit.default_timer() + if self._self_first_chunk_at is None: + self._self_first_chunk_at = t_after_read + if self._self_start_time_s is not None: + self._self_ttfc_seconds = ( + t_after_read - self._self_start_time_s + ) + else: + previous = self._self_last_chunk_at + if previous is not None: + self._safe_record_chunk_gap(t_after_read - previous) + self._self_last_chunk_at = t_after_read + try: self._process_chunk(chunk) except Exception as error: # pylint: disable=broad-exception-caught self._handle_process_chunk_error(error) return chunk + def _safe_record_chunk_gap(self, gap: float) -> None: + if self._self_timing_target is None: + return + try: + self._self_timing_target._record_chunk_gap(gap) + except Exception: # pylint: disable=broad-exception-caught + _logger.debug( + "GenAI stream instrumentation error recording chunk gap", + exc_info=True, + ) + + def _sync_timing_to_target(self) -> None: + if self._self_timing_target is not None: + self._self_timing_target.ttfc_seconds = self._self_ttfc_seconds + def _finalize_success(self) -> None: if self._self_finalized: return self._self_finalized = True + self._sync_timing_to_target() self._on_stream_end() def _finalize_failure(self, error: BaseException) -> None: if self._self_finalized: return self._self_finalized = True + self._sync_timing_to_target() self._on_stream_error(error) def _safe_finalize_success(self) -> None: @@ -191,11 +257,25 @@ class AsyncStreamWrapper( are owned by this base class. """ - def __init__(self, stream: _AsyncStream[ChunkT]): + def __init__( + self, + stream: _AsyncStream[ChunkT], + start_time_s: float | None = None, + timing_target: _TimingTarget | None = None, + ) -> None: + if timing_target is not None and start_time_s is None: + raise ValueError( + "start_time_s is required when timing_target is provided" + ) super().__init__(stream) self._self_stream = stream self._self_aiter = aiter(stream) self._self_finalized = False + self._self_start_time_s = start_time_s + self._self_timing_target = timing_target + self._self_ttfc_seconds: float | None = None + self._self_first_chunk_at: float | None = None + self._self_last_chunk_at: float | None = None async def __aenter__(self): return self @@ -245,22 +325,57 @@ async def __anext__(self) -> ChunkT: except Exception as error: self._safe_finalize_failure(error) raise + + if ( + self._self_start_time_s is not None + or self._self_timing_target is not None + ): + t_after_read = timeit.default_timer() + if self._self_first_chunk_at is None: + self._self_first_chunk_at = t_after_read + if self._self_start_time_s is not None: + self._self_ttfc_seconds = ( + t_after_read - self._self_start_time_s + ) + else: + previous = self._self_last_chunk_at + if previous is not None: + self._safe_record_chunk_gap(t_after_read - previous) + self._self_last_chunk_at = t_after_read + try: self._process_chunk(chunk) except Exception as error: # pylint: disable=broad-exception-caught self._handle_process_chunk_error(error) return chunk + def _safe_record_chunk_gap(self, gap: float) -> None: + if self._self_timing_target is None: + return + try: + self._self_timing_target._record_chunk_gap(gap) + except Exception: # pylint: disable=broad-exception-caught + _logger.debug( + "GenAI stream instrumentation error recording chunk gap", + exc_info=True, + ) + + def _sync_timing_to_target(self) -> None: + if self._self_timing_target is not None: + self._self_timing_target.ttfc_seconds = self._self_ttfc_seconds + def _finalize_success(self) -> None: if self._self_finalized: return self._self_finalized = True + self._sync_timing_to_target() self._on_stream_end() def _finalize_failure(self, error: BaseException) -> None: if self._self_finalized: return self._self_finalized = True + self._sync_timing_to_target() self._on_stream_error(error) def _safe_finalize_success(self) -> None: diff --git a/util/opentelemetry-util-genai/tests/test_handler_metrics.py b/util/opentelemetry-util-genai/tests/test_handler_metrics.py index 37a13cdd..70a208c7 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_metrics.py +++ b/util/opentelemetry-util-genai/tests/test_handler_metrics.py @@ -392,3 +392,108 @@ def test_fail_tool_records_duration_with_error(self) -> None: ) self.assertAlmostEqual(duration_point.sum, 1.5, places=3) self.assertNotIn("gen_ai.client.token.usage", metrics) + + def test_stop_inference_records_ttfc_and_chunk_gaps(self) -> None: + """Verify TTFC and time_per_chunk histograms are recorded for streaming.""" + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + with patch("timeit.default_timer", return_value=1000.0): + invocation = handler.start_inference("prov", request_model="model") + + # Simulate streaming timing + invocation.ttfc_seconds = 0.35 + invocation.chunk_gap_seconds = [0.05, 0.08, 0.12] + + with patch("timeit.default_timer", return_value=1002.0): + invocation.stop() + + # Timing fields are reset after recording so they aren't double-counted + self.assertIsNone(invocation.ttfc_seconds) + self.assertEqual(invocation.chunk_gap_seconds, []) + + metrics = self._harvest_metrics() + + # TTFC histogram + self.assertIn("gen_ai.client.operation.time_to_first_chunk", metrics) + ttfc_points = metrics["gen_ai.client.operation.time_to_first_chunk"] + self.assertEqual(len(ttfc_points), 1) + self.assertAlmostEqual(ttfc_points[0].sum, 0.35, places=3) + self.assertEqual( + ttfc_points[0].attributes[GenAI.GEN_AI_OPERATION_NAME], + GenAI.GenAiOperationNameValues.CHAT.value, + ) + self.assertEqual( + ttfc_points[0].attributes[GenAI.GEN_AI_PROVIDER_NAME], "prov" + ) + self.assertEqual( + ttfc_points[0].attributes[GenAI.GEN_AI_REQUEST_MODEL], "model" + ) + + # Time per chunk histogram (one record per gap) + self.assertIn("gen_ai.client.operation.time_per_output_chunk", metrics) + chunk_points = metrics["gen_ai.client.operation.time_per_output_chunk"] + self.assertEqual(len(chunk_points), 1) + # Sum of all gaps: 0.05 + 0.08 + 0.12 = 0.25 + self.assertAlmostEqual(chunk_points[0].sum, 0.25, places=3) + # Count should be 3 (one per gap) + self.assertEqual(chunk_points[0].count, 3) + self.assertEqual( + chunk_points[0].attributes[GenAI.GEN_AI_OPERATION_NAME], + GenAI.GenAiOperationNameValues.CHAT.value, + ) + self.assertEqual( + chunk_points[0].attributes[GenAI.GEN_AI_PROVIDER_NAME], "prov" + ) + self.assertEqual( + chunk_points[0].attributes[GenAI.GEN_AI_REQUEST_MODEL], "model" + ) + + def test_ttfc_span_attribute_recorded(self) -> None: + """Verify gen_ai.response.time_to_first_chunk lands on the span.""" + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + with patch("timeit.default_timer", return_value=1000.0): + invocation = handler.start_inference("prov", request_model="model") + + invocation.ttfc_seconds = 0.42 + + with patch("timeit.default_timer", return_value=1002.0): + invocation.stop() + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + attrs = spans[0].attributes + self.assertIn("gen_ai.response.time_to_first_chunk", attrs) + self.assertAlmostEqual( + attrs["gen_ai.response.time_to_first_chunk"], 0.42, places=3 + ) + + def test_no_ttfc_recorded_for_non_streaming(self) -> None: + """Non-streaming calls should not emit TTFC or chunk metrics.""" + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + with patch("timeit.default_timer", return_value=1000.0): + invocation = handler.start_inference("prov", request_model="model") + invocation.input_tokens = 10 + invocation.output_tokens = 20 + + with patch("timeit.default_timer", return_value=1001.0): + invocation.stop() + + metrics = self._harvest_metrics() + # Duration and tokens should exist + self.assertIn("gen_ai.client.operation.duration", metrics) + self.assertIn("gen_ai.client.token.usage", metrics) + # TTFC and chunk metrics should NOT exist (no data points) + self.assertNotIn( + "gen_ai.client.operation.time_to_first_chunk", metrics + ) + self.assertNotIn( + "gen_ai.client.operation.time_per_output_chunk", metrics + ) diff --git a/util/opentelemetry-util-genai/tests/test_stream.py b/util/opentelemetry-util-genai/tests/test_stream.py index dfbb4795..4ba3fc2e 100644 --- a/util/opentelemetry-util-genai/tests/test_stream.py +++ b/util/opentelemetry-util-genai/tests/test_stream.py @@ -5,6 +5,7 @@ import asyncio import inspect +from unittest.mock import patch import pytest @@ -540,3 +541,219 @@ async def exercise(): assert not wrapper._self_failures asyncio.run(exercise()) + + +# --- Timing measurement tests --- + + +def test_sync_stream_wrapper_records_ttfc(): + """TTFC is computed from start_time_s to first chunk arrival.""" + + stream = _FakeSyncStream(chunks=["a", "b", "c"]) + wrapper = _TestSyncStreamWrapper.__new__(_TestSyncStreamWrapper) + SyncStreamWrapper.__init__(wrapper, stream, start_time_s=99.0) + wrapper._self_processed = [] + wrapper._self_stop_count = 0 + wrapper._self_failures = [] + + # Iterate with controlled time. Only after-read timestamps are + # consumed now that the wrapper no longer measures blocking-read time. + read_times = iter([101.2, 101.8, 102.1]) + with patch("timeit.default_timer", side_effect=read_times): + chunks = list(wrapper) + + assert chunks == ["a", "b", "c"] + # TTFC = first chunk after_read (101.2) - start_time_s (99.0) = 2.2 + assert wrapper._self_ttfc_seconds == pytest.approx(2.2) + # Inter-chunk gaps: 101.8 - 101.2 = 0.6, 102.1 - 101.8 = 0.3 + assert wrapper._self_last_chunk_at == pytest.approx(102.1) + + +def test_sync_stream_wrapper_no_ttfc_without_start_time(): + """Without start_time_s, TTFC stays None.""" + stream = _FakeSyncStream(chunks=["a", "b"]) + wrapper = _TestSyncStreamWrapper(stream) + + list(wrapper) + + assert wrapper._self_ttfc_seconds is None + # With timing disabled (no start_time_s, no timing_target), all + # timing state stays None; the chunk-arrival timer is not invoked. + assert wrapper._self_last_chunk_at is None + + +def test_sync_stream_wrapper_single_chunk_no_gaps(): + """Single-chunk stream has TTFC but no gaps.""" + + stream = _FakeSyncStream(chunks=["only"]) + wrapper = _TestSyncStreamWrapper.__new__(_TestSyncStreamWrapper) + SyncStreamWrapper.__init__(wrapper, stream, start_time_s=50.0) + wrapper._self_processed = [] + wrapper._self_stop_count = 0 + wrapper._self_failures = [] + + read_times = iter([60.5]) + with patch("timeit.default_timer", side_effect=read_times): + chunks = list(wrapper) + + assert chunks == ["only"] + assert wrapper._self_ttfc_seconds == pytest.approx(10.5) # 60.5 - 50.0 + # First chunk records arrival time but emits no gap. + assert wrapper._self_last_chunk_at == pytest.approx(60.5) + + +def test_sync_stream_wrapper_error_before_first_chunk_no_ttfc(): + """If stream errors before first chunk, no TTFC is recorded.""" + stream = _FakeSyncStream(error=RuntimeError("network")) + wrapper = _TestSyncStreamWrapper.__new__(_TestSyncStreamWrapper) + SyncStreamWrapper.__init__(wrapper, stream, start_time_s=10.0) + wrapper._self_processed = [] + wrapper._self_stop_count = 0 + wrapper._self_failures = [] + + with pytest.raises(RuntimeError, match="network"): + next(wrapper) + + assert wrapper._self_ttfc_seconds is None + assert wrapper._self_last_chunk_at is None + + +def test_async_stream_wrapper_records_ttfc(): + """Async wrapper records TTFC and chunk gaps.""" + + async def exercise(): + stream = _FakeAsyncStream(chunks=["x", "y", "z"]) + wrapper = _TestAsyncStreamWrapper.__new__(_TestAsyncStreamWrapper) + AsyncStreamWrapper.__init__(wrapper, stream, start_time_s=200.0) + wrapper._self_processed = [] + wrapper._self_stop_count = 0 + wrapper._self_failures = [] + + read_times = iter([201.3, 202.0, 202.2]) + with patch("timeit.default_timer", side_effect=read_times): + chunks = [] + async for chunk in wrapper: + chunks.append(chunk) + + assert chunks == ["x", "y", "z"] + assert wrapper._self_ttfc_seconds == pytest.approx( + 1.3 + ) # 201.3 - 200.0 + # Inter-chunk gaps: 202.0 - 201.3 = 0.7, 202.2 - 202.0 = 0.2 + assert wrapper._self_last_chunk_at == pytest.approx(202.2) + + asyncio.run(exercise()) + + +# --- timing_target sync tests --- + + +class _FakeTimingTarget: + def __init__(self): + self.ttfc_seconds = None + self.chunk_gap_seconds = [] + + def _record_chunk_gap(self, gap): + self.chunk_gap_seconds.append(gap) + + +class _TimingTargetSyncWrapper(SyncStreamWrapper): + def __init__(self, stream, **kwargs): + super().__init__(stream, **kwargs) + self._self_processed = [] + self._self_end_target_ttfc = None + + def _process_chunk(self, chunk): + self._self_processed.append(chunk) + + def _on_stream_end(self): + if self._self_timing_target is not None: + self._self_end_target_ttfc = self._self_timing_target.ttfc_seconds + + def _on_stream_error(self, error): + if self._self_timing_target is not None: + self._self_end_target_ttfc = self._self_timing_target.ttfc_seconds + + +def test_timing_target_receives_values_on_success(): + mock_patch = patch + + target = _FakeTimingTarget() + stream = _FakeSyncStream(chunks=["a", "b", "c"]) + wrapper = _TimingTargetSyncWrapper( + stream, start_time_s=100.0, timing_target=target + ) + + times = iter([100.5, 101.0, 101.3]) + with mock_patch("timeit.default_timer", side_effect=times): + for _ in wrapper: + pass + + assert target.ttfc_seconds == pytest.approx(0.5) + # Inter-chunk gaps: 101.0 - 100.5 = 0.5, 101.3 - 101.0 = 0.3 + assert target.chunk_gap_seconds == pytest.approx([0.5, 0.3]) + + +def test_timing_target_without_start_time_raises(): + """timing_target without start_time_s is a misconfiguration: TTFC has no + meaning without a start, so the constructor surfaces it loudly rather than + silently emitting no TTFC.""" + target = _FakeTimingTarget() + stream = _FakeSyncStream(chunks=["a"]) + with pytest.raises(ValueError, match="start_time_s is required"): + _TimingTargetSyncWrapper(stream, timing_target=target) + + +def test_async_timing_target_without_start_time_raises(): + target = _FakeTimingTarget() + stream = _FakeAsyncStream(chunks=["a"]) + wrapper = _TestAsyncStreamWrapper.__new__(_TestAsyncStreamWrapper) + with pytest.raises(ValueError, match="start_time_s is required"): + AsyncStreamWrapper.__init__(wrapper, stream, timing_target=target) + + +def test_timing_target_populated_before_on_stream_end(): + mock_patch = patch + + target = _FakeTimingTarget() + stream = _FakeSyncStream(chunks=["x"]) + wrapper = _TimingTargetSyncWrapper( + stream, start_time_s=50.0, timing_target=target + ) + + times = iter([50.2]) + with mock_patch("timeit.default_timer", side_effect=times): + for _ in wrapper: + pass + + # Hook captured target.ttfc_seconds when _on_stream_end ran + assert wrapper._self_end_target_ttfc == pytest.approx(0.2) + + +def test_timing_target_populated_before_on_stream_error(): + mock_patch = patch + + target = _FakeTimingTarget() + error = RuntimeError("fail") + stream = _FakeSyncStream(chunks=["x"], error=error) + wrapper = _TimingTargetSyncWrapper( + stream, start_time_s=50.0, timing_target=target + ) + + times = iter([50.2]) + with mock_patch("timeit.default_timer", side_effect=times): + with pytest.raises(RuntimeError): + for _ in wrapper: + pass + + assert wrapper._self_end_target_ttfc == pytest.approx(0.2) + + +def test_no_timing_target_still_works(): + stream = _FakeSyncStream(chunks=["a", "b"]) + wrapper = _TimingTargetSyncWrapper(stream, start_time_s=10.0) + + for _ in wrapper: + pass + + assert wrapper._self_ttfc_seconds is not None