Add streaming timing metrics to generic stream wrappers#13
Conversation
There was a problem hiding this comment.
Pull request overview
Adds shared streaming timing support in the GenAI utility layer and wires OpenAI v2 chat stream wrappers to propagate those timings into inference telemetry.
Changes:
- Adds TTFC and per-output-chunk timing capture to sync/async stream wrapper base classes.
- Records new streaming timing histograms and the TTFC span attribute from
InferenceInvocation. - Updates OpenAI v2 chat stream wrappers, changelogs, and utility tests for the new timing behavior.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py |
Measures TTFC and per-chunk read durations in shared sync/async stream wrappers. |
util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py |
Records streaming timing histograms from invocation timing fields. |
util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py |
Defines histogram creation helpers for streaming timing metrics. |
util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py |
Adds timing fields and emits TTFC as an inference span attribute. |
instrumentation/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py |
Passes invocation start time into stream wrappers and copies measured timings back. |
util/opentelemetry-util-genai/tests/test_stream.py |
Adds sync/async stream wrapper timing tests. |
util/opentelemetry-util-genai/tests/test_handler_metrics.py |
Adds metric recorder tests for streaming timing histograms. |
util/opentelemetry-util-genai/CHANGELOG.md |
Documents utility streaming timing support. |
instrumentation/opentelemetry-instrumentation-openai-v2/CHANGELOG.md |
Documents OpenAI v2 chat streaming timing metrics. |
Comments suppressed due to low confidence (1)
util/opentelemetry-util-genai/src/opentelemetry/util/genai/stream.py:217
- The async wrapper has the same unbounded accumulation issue:
_self_chunk_gapsstores one entry per output chunk until the stream finalizes. For large or long-lived streams this can grow without bound; prefer recording each gap immediately or passing timings through a bounded recorder.
self._self_chunk_gaps: list[float] = []
|
@lmolkova This implements the three items you mentioned in #8:
For chunk gaps I'm measuring blocking read time rather than wall clock between returns, so user-side processing doesn't inflate it. Let me know if that's not what you had in mind. Anthropic/Responses API don't use the ABC yet so they'll need follow-up work in a separate PR. |
MikeGoldsmith
left a comment
There was a problem hiding this comment.
Looks mostly okay, I've left a few of comments we should resolve before accepting.
|
@MikeGoldsmith resolved all review thread, can you re-review when you have a moment. |
eternalcuriouslearner
left a comment
There was a problem hiding this comment.
LGTM!! Couple of changes and a dumb performance question:
- Can we please add a VCR backed test to verify the span and metrics?
- Can we extend the tests we have for sync wrapper to async wrapper?
- Any idea on how large a
chunk_gap_secondscan grow for long running streams or multi turn conversations?
|
FYI if your PR needs a changelog entry, please update it to use towncrier #16 |
Apply the inter-chunk gap semantics requested by MikeGoldsmith on PR open-telemetry#13. The wrapper now tracks only `_self_last_chunk_at` and pushes each gap to the timing target via the new `_record_chunk_gap` method on the `_TimingTarget` protocol; `InferenceInvocation` implements the method and continues to drain buffered gaps via `_consume_streaming_timing` at stop. The `time_per_output_chunk` histogram now matches the semconv definition `t_after_read[n] - t_after_read[n-1]`. Document the TTFC contract on the `monotonic_start_s` property: the anchor is invocation creation, so instrumentations must not perform meaningful work between `start_inference()` and the wrapped SDK call. Re-baseline the affected `test_stream.py` cases to the inter-chunk math, assert exact gap values on the timing target, drop the obsolete wrapper-side list checks, and hoist the `unittest.mock.patch` imports to module scope. Add towncrier news fragments for PR open-telemetry#13 under `util/opentelemetry-util-genai/.changelog/13.added` and `instrumentation/opentelemetry-instrumentation-openai-v2/.changelog/13.added` per the changelog migration requested by aabmass.
The towncrier news fragments at util/opentelemetry-util-genai/.changelog/13.added and instrumentation/opentelemetry-instrumentation-openai-v2/.changelog/13.added already cover the streaming TTFC and time_per_output_chunk metrics, so remove the duplicated Unreleased lines from the two CHANGELOG.md files. Keeping them would have caused the changelog workflow to fail because it blocks PRs that diff CHANGELOG.md against main.
@aabmass Migrated the entries to towncrier news fragments at |
|
|
||
| def _record_chunk_gap(self, gap: float) -> None: ... | ||
|
|
||
|
|
There was a problem hiding this comment.
Shouldn't _consume_streaming_timing be defined here as well?
There was a problem hiding this comment.
_TimingTarget is the write side that the stream wrapper uses to push timing into an invocation: ttfc_seconds gets set once at first chunk and chunk gaps are pushed through _record_chunk_gap. _consume_streaming_timing is the read side, called by InvocationMetricsRecorder at finalize to drain the buffered values. The wrapper never calls it, so adding it to the Protocol would advertise a method the only caller of the Protocol does not need.
I can add it if you want both sides of the contract documented in one place, but on its own it would be unused by the wrapper.
| _logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class _TimingTarget(Protocol): |
There was a problem hiding this comment.
Does this mean we need all invocations to implement this? Shouldn't we just include this as part of GenAIInvocation?
There was a problem hiding this comment.
Two reasons it is a Protocol today:
- Keeps
stream.pyoff the concrete GenAIInvocation. The wrapper only needs an object that can receive ttfc_seconds and accept _record_chunk_gap, nothing else from the invocation surface. - Non-streaming invocation types (embeddings, tools, workflows) would have to carry unused ttfc_seconds / chunk-gap state if those fields lived on the base. With the Protocol, only invocations that actually stream declare it.
If you want it on GenAIInvocation anyway, I can move it in a follow-up. Trade is two extra fields on every invocation (None / empty on non-streaming ones) for a single place to document the contract. Let me know which way you want it.
| attrs.update(self.metric_attributes) | ||
| return attrs | ||
|
|
||
| def _record_chunk_gap(self, gap: float) -> None: |
There was a problem hiding this comment.
Will this logic be pretty similar across different invocations? Probably can abstract this?
There was a problem hiding this comment.
Same thread as _TimingTarget above. If we lift the streaming timing state up to GenAIInvocation, both _record_chunk_gap and _consume_streaming_timing move with it and the Protocol goes away.
If we keep the Protocol, those methods stay on InferenceInvocation since that is the only invocation that streams today. Will follow whichever direction you pick on the thread above and batch it with the next push.
There was a problem hiding this comment.
Leaning toward keeping the Protocol for this PR, batching the decision with the next push.
The trade I see:
- Protocol:
stream.pystays decoupled from the concrete invocation types, andEmbeddingInvocation,ToolInvocation,AgentInvocation,WorkflowInvocationdo not carryttfc_secondsand the chunk-gap buffer as dead state. A future streaming invocation type implements the Protocol surface (ttfc_seconds: float | Noneattr and_record_chunk_gapmethod) and gets type-checking from pyright. - On
GenAIInvocation: one doc site for the streaming-timing contract, but every invocation pays for state it does not use, and there is no signal that distinguishes streaming from non-streaming invocations at the type level.
Threads at stream.py:46 and _inference_invocation.py:185 are the same call, so I will follow your direction here for all three.
Happy to flip to the base-class version if you prefer the single-site documentation.
| Buffered gaps are drained by ``_consume_streaming_timing`` when the | ||
| invocation stops. | ||
| """ | ||
| self.chunk_gap_seconds.append(gap) |
There was a problem hiding this comment.
Seems like there's no limit to this unbounded list, which might be a concern if the stream is really long. Can we record to the histogram directly every time instead?
There was a problem hiding this comment.
One snag with inline recording: gen_ai.response.model is only set after the first response chunk arrives, so gaps between chunks 1 and 2 would emit without it while later gaps have it (same
stream split across two attribute keys).
Options I see:
-
- Inline + drop
response.modelfrom this metric.
- Inline + drop
-
- Inline + snapshot attributes at first-chunk time.
-
- Sliding-window cap on the buffer, keep finalize record.
-
- Keep as is, document the memory shape.
There was a problem hiding this comment.
@lzchen I'm leaning toward option 4 (keep as is, document the memory shape) for this PR. The buffer lifetime is bounded by the invocation, and the per-invocation list length is the SDK's chunk count for that one call. For openai-v2 chat completion streams in scope here, that stays in the thousands of floats per invocation and is released at stop. Can add a short comment near chunk_gap_seconds documenting the shape.
Option 2 (inline record with snapshot attributes at first-chunk) is the cleaner path if we later target agent-style indefinite streams. Happy to switch to option 2 here if you would rather not defer it.
…review Two changes from the second round of review on PR open-telemetry#13: 1. Reword the TTFC and time-per-output-chunk histogram descriptions so the measurement window is explicit. TTFC now spells out that the timer runs from the client issuing the generation request to the first chunk being received in the response stream. time-per-output-chunk now clarifies that the value is recorded for every chunk after the first, measured end-to-end between consecutive chunks. 2. Validate that start_time_s is provided whenever a timing_target is wired up. TTFC has no meaning without a start time, so passing a timing target alone is a misconfiguration. SyncStreamWrapper and AsyncStreamWrapper now raise ValueError at construction rather than silently emitting no TTFC. start_time_s stays optional when no timing_target is provided so the wrapper is still usable as a plain chunk-processing wrapper. Tests cover the new failure mode on both wrappers.
…review Two changes from the second round of review on PR open-telemetry#13: 1. Reword the TTFC and time-per-output-chunk histogram descriptions so the measurement window is explicit. TTFC now spells out that the timer runs from the client issuing the generation request to the first chunk being received in the response stream. time-per-output-chunk now clarifies that the value is recorded for every chunk after the first, measured end-to-end between consecutive chunks. 2. Validate that start_time_s is provided whenever a timing_target is wired up. TTFC has no meaning without a start time, so passing a timing target alone is a misconfiguration. SyncStreamWrapper and AsyncStreamWrapper now raise ValueError at construction rather than silently emitting no TTFC. start_time_s stays optional when no timing_target is provided so the wrapper is still usable as a plain chunk-processing wrapper. Tests cover the new failure mode on both wrappers.
6ea537b to
78917c6
Compare
78917c6 to
4d89828
Compare
Apply the inter-chunk gap semantics requested by MikeGoldsmith on PR open-telemetry#13. The wrapper now tracks only `_self_last_chunk_at` and pushes each gap to the timing target via the new `_record_chunk_gap` method on the `_TimingTarget` protocol; `InferenceInvocation` implements the method and continues to drain buffered gaps via `_consume_streaming_timing` at stop. The `time_per_output_chunk` histogram now matches the semconv definition `t_after_read[n] - t_after_read[n-1]`. Document the TTFC contract on the `monotonic_start_s` property: the anchor is invocation creation, so instrumentations must not perform meaningful work between `start_inference()` and the wrapped SDK call. Re-baseline the affected `test_stream.py` cases to the inter-chunk math, assert exact gap values on the timing target, drop the obsolete wrapper-side list checks, and hoist the `unittest.mock.patch` imports to module scope. Add towncrier news fragments for PR open-telemetry#13 under `util/opentelemetry-util-genai/.changelog/13.added` and `instrumentation/opentelemetry-instrumentation-openai-v2/.changelog/13.added` per the changelog migration requested by aabmass.
The towncrier news fragments at util/opentelemetry-util-genai/.changelog/13.added and instrumentation/opentelemetry-instrumentation-openai-v2/.changelog/13.added already cover the streaming TTFC and time_per_output_chunk metrics, so remove the duplicated Unreleased lines from the two CHANGELOG.md files. Keeping them would have caused the changelog workflow to fail because it blocks PRs that diff CHANGELOG.md against main.
…review Two changes from the second round of review on PR open-telemetry#13: 1. Reword the TTFC and time-per-output-chunk histogram descriptions so the measurement window is explicit. TTFC now spells out that the timer runs from the client issuing the generation request to the first chunk being received in the response stream. time-per-output-chunk now clarifies that the value is recorded for every chunk after the first, measured end-to-end between consecutive chunks. 2. Validate that start_time_s is provided whenever a timing_target is wired up. TTFC has no meaning without a start time, so passing a timing target alone is a misconfiguration. SyncStreamWrapper and AsyncStreamWrapper now raise ValueError at construction rather than silently emitting no TTFC. start_time_s stays optional when no timing_target is provided so the wrapper is still usable as a plain chunk-processing wrapper. Tests cover the new failure mode on both wrappers.
4d89828 to
6f465ca
Compare
|
@MikeGoldsmith @eternalcuriouslearner rebased this PR onto main (eaca532) to clear the conflict from PR #60, then squashed the history into a single commit (6f465ca) so the diff is easier to scan. CI is green across all 82 checks on the new head. The single conflict was the OpenAI changelog file. PR #60 renamed One side effect of the squash is that previous inline comments are showing as outdated on the new commit. The line context is unchanged, so the resolutions should still hold against If everything looks good, could you take another pass when you have a moment? @lzchen thanks for the deep review. Your seven inline comments are all addressed in the squashed commit, and my replies to each are still on the original threads (now marked outdated by the force push). |
Records two streaming-specific histograms on inference invocations: gen_ai.client.operation.time_to_first_chunk and gen_ai.client.operation.time_per_output_chunk. Adds a _TimingTarget protocol on the shared stream wrappers in opentelemetry-util-genai so provider instrumentations can opt into the timing path without duplicating it, and wires the OpenAI instrumentation through it. Includes unit tests for both the util-genai stream wrappers and the OpenAI stream path, asserting that the time_to_first_chunk span attribute and the two histograms are emitted with the expected attribute set. Towncrier changelog entries live under util/opentelemetry-util-genai/.changelog/13.added and instrumentation/opentelemetry-instrumentation-genai-openai/.changelog/13.added.
6f465ca to
dbcafb1
Compare
Closes #8
What this does
Adds three streaming timing measurements to the shared utils layer:
gen_ai.client.operation.time_to_first_chunkhistogramgen_ai.response.time_to_first_chunkspan attributegen_ai.client.operation.time_per_output_chunkhistogram (one data point per inter-chunk gap)All timing logic lives in the stream wrapper base classes (
SyncStreamWrapper/AsyncStreamWrapperinutil/opentelemetry-util-genai). The OpenAI chat wrappers just pass through the invocation start time and copy measured values back, keeping provider-specific code minimal.How timing works
timeit.default_timer()around the blockingnext()/anext()callScope
This wires up timing for OpenAI chat completions streams (which use the shared ABC). Anthropic and the Responses API have their own stream wrappers that don't extend the ABC, so they will need separate follow-up work.
Testing