-
Notifications
You must be signed in to change notification settings - Fork 18
Add streaming timing metrics to generic stream wrappers #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Record `gen_ai.client.operation.time_to_first_chunk` and `gen_ai.client.operation.time_per_output_chunk` metrics for chat completion streams. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Record streaming `gen_ai.client.operation.time_to_first_chunk` and `gen_ai.client.operation.time_per_output_chunk` histograms on inference invocations, and expose a `_TimingTarget` protocol on the shared stream wrappers so per-chunk gaps are pushed inline rather than buffered in the wrapper. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ | |
| from __future__ import annotations | ||
|
|
||
| from dataclasses import dataclass, field | ||
| from typing import Any | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| from opentelemetry._logs import Logger, LogRecord | ||
| from opentelemetry.semconv._incubating.attributes import ( | ||
|
|
@@ -18,7 +18,6 @@ | |
| get_content_attributes, | ||
| ) | ||
| from opentelemetry.util.genai.completion_hook import CompletionHook | ||
| from opentelemetry.util.genai.metrics import InvocationMetricsRecorder | ||
| from opentelemetry.util.genai.types import ( | ||
| InputMessage, | ||
| MessagePart, | ||
|
|
@@ -30,8 +29,12 @@ | |
| should_emit_event, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| from opentelemetry.util.genai.metrics import InvocationMetricsRecorder | ||
|
|
||
| # TODO: Migrate to GenAI constants once available in semconv package | ||
| _GEN_AI_REASONING_OUTPUT_TOKENS = "gen_ai.usage.reasoning.output_tokens" | ||
| _GEN_AI_RESPONSE_TIME_TO_FIRST_CHUNK = "gen_ai.response.time_to_first_chunk" | ||
|
|
||
|
|
||
| class InferenceInvocation(GenAIInvocation): | ||
|
|
@@ -93,6 +96,9 @@ def __init__( | |
| self.cache_creation_input_tokens: int | None = None | ||
| self.cache_read_input_tokens: int | None = None | ||
| self.tool_definitions: list[ToolDefinition] | None = None | ||
| # Streaming timing fields (populated by stream wrappers) | ||
| self.ttfc_seconds: float | None = None | ||
| self.chunk_gap_seconds: list[float] = [] | ||
| self._start(self._get_base_attributes()) | ||
|
|
||
| def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]: | ||
|
|
@@ -161,6 +167,10 @@ def _get_attributes(self) -> dict[str, Any]: | |
| _GEN_AI_REASONING_OUTPUT_TOKENS, | ||
| self.thinking_tokens, | ||
| ), | ||
| ( | ||
| _GEN_AI_RESPONSE_TIME_TO_FIRST_CHUNK, | ||
| self.ttfc_seconds, | ||
| ), | ||
| ) | ||
| attrs.update({k: v for k, v in optional_attrs if v is not None}) | ||
| return attrs | ||
|
|
@@ -172,6 +182,29 @@ def _get_metric_attributes(self) -> dict[str, Any]: | |
| attrs.update(self.metric_attributes) | ||
| return attrs | ||
|
|
||
| def _record_chunk_gap(self, gap: float) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this logic be pretty similar across different invocations? Probably can abstract this?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same thread as If we keep the Protocol, those methods stay on InferenceInvocation since that is the only invocation that streams today. Will follow whichever direction you pick on the thread above and batch it with the next push.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leaning toward keeping the Protocol for this PR, batching the decision with the next push. The trade I see:
Threads at Happy to flip to the base-class version if you prefer the single-site documentation. |
||
| """Buffer a time-per-output-chunk gap (in seconds). | ||
|
|
||
| Called by the stream wrapper as each chunk after the first arrives. | ||
| Buffered gaps are drained by ``_consume_streaming_timing`` when the | ||
| invocation stops. | ||
| """ | ||
| self.chunk_gap_seconds.append(gap) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like there's no limit to this unbounded list, which might be a concern if the stream is really long. Can we record to the histogram directly every time instead?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One snag with inline recording: Options I see:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lzchen I'm leaning toward option 4 (keep as is, document the memory shape) for this PR. The buffer lifetime is bounded by the invocation, and the per-invocation list length is the SDK's chunk count for that one call. For openai-v2 chat completion streams in scope here, that stays in the thousands of floats per invocation and is released at stop. Can add a short comment near Option 2 (inline record with snapshot attributes at first-chunk) is the cleaner path if we later target agent-style indefinite streams. Happy to switch to option 2 here if you would rather not defer it. |
||
|
|
||
| def _consume_streaming_timing( | ||
| self, | ||
| ) -> tuple[float | None, list[float]]: | ||
| """Return TTFC and chunk gaps, then reset them on the invocation. | ||
|
|
||
| Called by InvocationMetricsRecorder so the timing values are emitted | ||
| once and not held past finalization. | ||
| """ | ||
| ttfc = self.ttfc_seconds | ||
| gaps = self.chunk_gap_seconds | ||
| self.ttfc_seconds = None | ||
| self.chunk_gap_seconds = [] | ||
| return ttfc, gaps | ||
|
|
||
| def _get_metric_token_counts(self) -> dict[str, int]: | ||
| counts: dict[str, int] = {} | ||
| if self.input_tokens is not None: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.