Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Record `gen_ai.client.operation.time_to_first_chunk` and `gen_ai.client.operation.time_per_output_chunk` metrics for chat completion streams.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ def __init__(
invocation: InferenceInvocation,
capture_content: bool,
) -> None:
super().__init__(stream)
super().__init__(
stream,
start_time_s=invocation.monotonic_start_s,
timing_target=invocation,
)
self._self_invocation = invocation
self._self_choice_buffers = []
self._self_capture_content = capture_content
Expand All @@ -203,7 +207,11 @@ def __init__(
invocation: InferenceInvocation,
capture_content: bool,
) -> None:
super().__init__(stream)
super().__init__(
stream,
start_time_s=invocation.monotonic_start_s,
timing_target=invocation,
)
self._self_invocation = invocation
self._self_choice_buffers = []
self._self_capture_content = capture_content
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,172 @@ async def test_async_chat_completion_metrics(
assert_all_metric_attributes(
output_token_usage, latest_experimental_enabled
)


# TTFC and per-output-chunk histograms have a different attribute shape than
# the duration/token histograms: streaming responses in the recorded cassettes
# do not include system_fingerprint or service_tier, so we assert only the
# core gen_ai.* attributes that should always be populated.
def assert_streaming_metric_attributes(
data_point, latest_experimental_enabled, expected_request_model
):
assert GenAIAttributes.GEN_AI_OPERATION_NAME in data_point.attributes
assert (
data_point.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME]
== GenAIAttributes.GenAiOperationNameValues.CHAT.value
)

provider_name_attr_name = (
"gen_ai.provider.name"
if latest_experimental_enabled
else GenAIAttributes.GEN_AI_SYSTEM
)
assert provider_name_attr_name in data_point.attributes
assert (
data_point.attributes[provider_name_attr_name]
== GenAIAttributes.GenAiSystemValues.OPENAI.value
)

assert GenAIAttributes.GEN_AI_REQUEST_MODEL in data_point.attributes
assert (
data_point.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL]
== expected_request_model
)
assert GenAIAttributes.GEN_AI_RESPONSE_MODEL in data_point.attributes
assert data_point.attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL]

assert (
data_point.attributes[ServerAttributes.SERVER_ADDRESS]
== "api.openai.com"
)


def test_chat_completion_streaming_metrics(
metric_reader, openai_client, instrument_with_content, vcr
):
"""Regression test for the openai_v2 sync chat stream wrapper wiring.

Exercises the actual ChatStreamWrapper path so that removing
timing_target=invocation in chat_wrappers.py would cause this test to
fail, not just the util-layer tests.
"""
if not is_experimental_mode():
pytest.skip("new stream wrapper only")

latest_experimental_enabled = is_experimental_mode()
request_model = "gpt-4"

with vcr.use_cassette("test_chat_completion_streaming.yaml"):
response = openai_client.chat.completions.create(
messages=USER_ONLY_PROMPT,
model=request_model,
stream=True,
stream_options={"include_usage": True},
)
for _ in response:
pass

metrics = metric_reader.get_metrics_data().resource_metrics
assert len(metrics) == 1
metric_data = metrics[0].scope_metrics[0].metrics

ttfc_metric = next(
(
m
for m in metric_data
if m.name == "gen_ai.client.operation.time_to_first_chunk"
),
None,
)
assert ttfc_metric is not None
assert len(ttfc_metric.data.data_points) == 1
ttfc_point = ttfc_metric.data.data_points[0]
assert ttfc_point.count == 1
assert ttfc_point.sum >= 0
assert_streaming_metric_attributes(
ttfc_point, latest_experimental_enabled, request_model
)

chunk_metric = next(
(
m
for m in metric_data
if m.name == "gen_ai.client.operation.time_per_output_chunk"
),
None,
)
assert chunk_metric is not None
assert len(chunk_metric.data.data_points) == 1
chunk_point = chunk_metric.data.data_points[0]
assert chunk_point.count >= 1
assert chunk_point.sum >= 0
assert_streaming_metric_attributes(
chunk_point, latest_experimental_enabled, request_model
)


@pytest.mark.asyncio()
async def test_async_chat_completion_streaming_metrics(
metric_reader, async_openai_client, instrument_with_content, vcr
):
"""Regression test for the openai_v2 async chat stream wrapper wiring.

The async path has separate __init__ wiring from the sync path in
chat_wrappers.py, so it needs its own coverage. Removing
timing_target=invocation in AsyncChatStreamWrapper would still pass
every util-layer test, but would silently break TTFC and per-output-chunk
metrics for async OpenAI streaming.
"""
if not is_experimental_mode():
pytest.skip("new stream wrapper only")

latest_experimental_enabled = is_experimental_mode()
request_model = "gpt-4"

with vcr.use_cassette("test_async_chat_completion_streaming.yaml"):
response = await async_openai_client.chat.completions.create(
messages=USER_ONLY_PROMPT,
model=request_model,
stream=True,
stream_options={"include_usage": True},
)
async for _ in response:
pass

metrics = metric_reader.get_metrics_data().resource_metrics
assert len(metrics) == 1
metric_data = metrics[0].scope_metrics[0].metrics

ttfc_metric = next(
(
m
for m in metric_data
if m.name == "gen_ai.client.operation.time_to_first_chunk"
),
None,
)
assert ttfc_metric is not None
assert len(ttfc_metric.data.data_points) == 1
ttfc_point = ttfc_metric.data.data_points[0]
assert ttfc_point.count == 1
assert ttfc_point.sum >= 0
assert_streaming_metric_attributes(
ttfc_point, latest_experimental_enabled, request_model
)

chunk_metric = next(
(
m
for m in metric_data
if m.name == "gen_ai.client.operation.time_per_output_chunk"
),
None,
)
assert chunk_metric is not None
assert len(chunk_metric.data.data_points) == 1
chunk_point = chunk_metric.data.data_points[0]
assert chunk_point.count >= 1
assert chunk_point.sum >= 0
assert_streaming_metric_attributes(
chunk_point, latest_experimental_enabled, request_model
)
1 change: 1 addition & 0 deletions util/opentelemetry-util-genai/.changelog/13.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Record streaming `gen_ai.client.operation.time_to_first_chunk` and `gen_ai.client.operation.time_per_output_chunk` histograms on inference invocations, and expose a `_TimingTarget` protocol on the shared stream wrappers so per-chunk gaps are pushed inline rather than buffered in the wrapper.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any
from typing import TYPE_CHECKING, Any

from opentelemetry._logs import Logger, LogRecord
from opentelemetry.semconv._incubating.attributes import (
Expand All @@ -18,7 +18,6 @@
get_content_attributes,
)
from opentelemetry.util.genai.completion_hook import CompletionHook
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
from opentelemetry.util.genai.types import (
InputMessage,
MessagePart,
Expand All @@ -30,8 +29,12 @@
should_emit_event,
)

if TYPE_CHECKING:
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder

# TODO: Migrate to GenAI constants once available in semconv package
_GEN_AI_REASONING_OUTPUT_TOKENS = "gen_ai.usage.reasoning.output_tokens"
_GEN_AI_RESPONSE_TIME_TO_FIRST_CHUNK = "gen_ai.response.time_to_first_chunk"


class InferenceInvocation(GenAIInvocation):
Expand Down Expand Up @@ -93,6 +96,9 @@ def __init__(
self.cache_creation_input_tokens: int | None = None
self.cache_read_input_tokens: int | None = None
self.tool_definitions: list[ToolDefinition] | None = None
# Streaming timing fields (populated by stream wrappers)
self.ttfc_seconds: float | None = None
self.chunk_gap_seconds: list[float] = []
self._start(self._get_base_attributes())

def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]:
Expand Down Expand Up @@ -161,6 +167,10 @@ def _get_attributes(self) -> dict[str, Any]:
_GEN_AI_REASONING_OUTPUT_TOKENS,
self.thinking_tokens,
),
(
_GEN_AI_RESPONSE_TIME_TO_FIRST_CHUNK,
self.ttfc_seconds,
Comment thread
Nik-Reddy marked this conversation as resolved.
),
)
attrs.update({k: v for k, v in optional_attrs if v is not None})
return attrs
Expand All @@ -172,6 +182,29 @@ def _get_metric_attributes(self) -> dict[str, Any]:
attrs.update(self.metric_attributes)
return attrs

def _record_chunk_gap(self, gap: float) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this logic be pretty similar across different invocations? Probably can abstract this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thread as _TimingTarget above. If we lift the streaming timing state up to GenAIInvocation, both _record_chunk_gap and _consume_streaming_timing move with it and the Protocol goes away.

If we keep the Protocol, those methods stay on InferenceInvocation since that is the only invocation that streams today. Will follow whichever direction you pick on the thread above and batch it with the next push.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaning toward keeping the Protocol for this PR, batching the decision with the next push.

The trade I see:

  • Protocol: stream.py stays decoupled from the concrete invocation types, and EmbeddingInvocation, ToolInvocation, AgentInvocation, WorkflowInvocation do not carry ttfc_seconds and the chunk-gap buffer as dead state. A future streaming invocation type implements the Protocol surface (ttfc_seconds: float | None attr and _record_chunk_gap method) and gets type-checking from pyright.
  • On GenAIInvocation: one doc site for the streaming-timing contract, but every invocation pays for state it does not use, and there is no signal that distinguishes streaming from non-streaming invocations at the type level.

Threads at stream.py:46 and _inference_invocation.py:185 are the same call, so I will follow your direction here for all three.

Happy to flip to the base-class version if you prefer the single-site documentation.

"""Buffer a time-per-output-chunk gap (in seconds).

Called by the stream wrapper as each chunk after the first arrives.
Buffered gaps are drained by ``_consume_streaming_timing`` when the
invocation stops.
"""
self.chunk_gap_seconds.append(gap)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there's no limit to this unbounded list, which might be a concern if the stream is really long. Can we record to the histogram directly every time instead?

Copy link
Copy Markdown
Contributor Author

@Nik-Reddy Nik-Reddy May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One snag with inline recording: gen_ai.response.model is only set after the first response chunk arrives, so gaps between chunks 1 and 2 would emit without it while later gaps have it (same
stream split across two attribute keys).

Options I see:

    1. Inline + drop response.model from this metric.
    1. Inline + snapshot attributes at first-chunk time.
    1. Sliding-window cap on the buffer, keep finalize record.
    1. Keep as is, document the memory shape.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lzchen I'm leaning toward option 4 (keep as is, document the memory shape) for this PR. The buffer lifetime is bounded by the invocation, and the per-invocation list length is the SDK's chunk count for that one call. For openai-v2 chat completion streams in scope here, that stays in the thousands of floats per invocation and is released at stop. Can add a short comment near chunk_gap_seconds documenting the shape.

Option 2 (inline record with snapshot attributes at first-chunk) is the cleaner path if we later target agent-style indefinite streams. Happy to switch to option 2 here if you would rather not defer it.


def _consume_streaming_timing(
self,
) -> tuple[float | None, list[float]]:
"""Return TTFC and chunk gaps, then reset them on the invocation.

Called by InvocationMetricsRecorder so the timing values are emitted
once and not held past finalization.
"""
ttfc = self.ttfc_seconds
gaps = self.chunk_gap_seconds
self.ttfc_seconds = None
self.chunk_gap_seconds = []
return ttfc, gaps

def _get_metric_token_counts(self) -> dict[str, int]:
counts: dict[str, int] = {}
if self.input_tokens is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ def __init__(
self._context_token: ContextToken | None = None
self._monotonic_start_s: float | None = None

@property
def monotonic_start_s(self) -> float | None:
"""Monotonic timestamp (seconds) when this invocation started.

This timestamp is the anchor for the streaming time-to-first-chunk
(TTFC) metric. Instrumentations MUST NOT perform meaningful work
between ``start_inference()`` and the wrapped SDK call: anything
heavier than building attribute dicts will silently inflate TTFC.
"""
return self._monotonic_start_s

def _start(self, attributes: dict[str, Any] | None = None) -> None:
"""Start the invocation span and attach it to the current context.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics

# TODO: Migrate to GenAI constants once available in semconv package
_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK = (
"gen_ai.client.operation.time_to_first_chunk"
)
_GEN_AI_CLIENT_OPERATION_TIME_PER_OUTPUT_CHUNK = (
"gen_ai.client.operation.time_per_output_chunk"
)

_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
0.01,
0.02,
Expand Down Expand Up @@ -55,3 +63,29 @@ def create_token_histogram(meter: Meter) -> Histogram:
unit="{token}",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
)


def create_ttfc_histogram(meter: Meter) -> Histogram:
return meter.create_histogram(
name=_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK,
description=(
"Time to receive the first chunk, measured from when the client "
"issues the generation request to when the first chunk is "
"received in the response stream."
),
unit="s",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
)


def create_time_per_chunk_histogram(meter: Meter) -> Histogram:
return meter.create_histogram(
name=_GEN_AI_CLIENT_OPERATION_TIME_PER_OUTPUT_CHUNK,
description=(
"Time per output chunk, recorded for each chunk received after "
"the first one, measured as the time elapsed from the end of "
"the previous chunk to the end of the current chunk."
),
unit="s",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
)
Loading