From 30cb3f99ec4a432d665437b70ef4172941d3ac85 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Thu, 19 Mar 2026 22:03:16 -0700 Subject: [PATCH 1/6] feat(tracing): expose public flush_traces() API for long-running workers In long-running processes (Celery, FastAPI background tasks, RQ), traces are buffered indefinitely because the process never exits to trigger shutdown flushing. This adds flush_traces() as a public API that delegates to the trace provider's force_flush(), allowing users to explicitly flush traces at the end of each task. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/agents/__init__.py | 2 ++ src/agents/tracing/__init__.py | 11 +++++++++++ src/agents/tracing/provider.py | 4 ++++ tests/test_trace_processor.py | 21 +++++++++++++++++++++ 4 files changed, 38 insertions(+) diff --git a/src/agents/__init__.py b/src/agents/__init__.py index 214e814d3e..54c739bbda 100644 --- a/src/agents/__init__.py +++ b/src/agents/__init__.py @@ -203,6 +203,7 @@ add_trace_processor, agent_span, custom_span, + flush_traces, function_span, gen_span_id, gen_trace_id, @@ -451,6 +452,7 @@ def enable_verbose_stdout_logging(): "add_trace_processor", "agent_span", "custom_span", + "flush_traces", "function_span", "generation_span", "get_current_span", diff --git a/src/agents/tracing/__init__.py b/src/agents/tracing/__init__.py index 9f5e4f7568..47a3cc2643 100644 --- a/src/agents/tracing/__init__.py +++ b/src/agents/tracing/__init__.py @@ -41,6 +41,7 @@ __all__ = [ "add_trace_processor", "agent_span", + "flush_traces", "custom_span", "function_span", "generation_span", @@ -108,3 +109,13 @@ def set_tracing_export_api_key(api_key: str) -> None: Set the OpenAI API key for the backend exporter. """ default_exporter().set_api_key(api_key) + + +def flush_traces() -> None: + """Force an immediate flush of all buffered traces and spans. + + Call this at the end of each task in long-running worker processes + (Celery, FastAPI background tasks, RQ, etc.) to ensure traces are + exported to the backend rather than remaining buffered indefinitely. + """ + get_trace_provider().force_flush() diff --git a/src/agents/tracing/provider.py b/src/agents/tracing/provider.py index 90ea85cbf0..7f3c1652ae 100644 --- a/src/agents/tracing/provider.py +++ b/src/agents/tracing/provider.py @@ -365,6 +365,10 @@ def create_span( trace_metadata=trace_metadata, ) + def force_flush(self) -> None: + """Force all processors to flush their buffers immediately.""" + self._multi_processor.force_flush() + def shutdown(self) -> None: if self._disabled: return diff --git a/tests/test_trace_processor.py b/tests/test_trace_processor.py index ad061d7995..82b6405946 100644 --- a/tests/test_trace_processor.py +++ b/tests/test_trace_processor.py @@ -835,3 +835,24 @@ def test_truncate_string_for_json_limit_handles_escape_heavy_input(): assert truncated.endswith(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX) assert exporter._value_json_size_bytes(truncated) <= max_bytes exporter.close() + + +def test_flush_traces_calls_provider_force_flush(): + """Test that flush_traces() delegates to the global trace provider's force_flush().""" + from unittest.mock import MagicMock, patch + + mock_provider = MagicMock() + + with patch("agents.tracing.get_trace_provider", return_value=mock_provider): + from agents.tracing import flush_traces + + flush_traces() + + mock_provider.force_flush.assert_called_once() + + +def test_flush_traces_importable_from_agents(): + """Test that flush_traces is importable from the top-level agents package.""" + from agents import flush_traces + + assert callable(flush_traces) From f29ffb7a27e6a352059974ab331c1f4fd76179ef Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Thu, 19 Mar 2026 22:50:29 -0700 Subject: [PATCH 2/6] fix(tracing): add force_flush to TraceProvider ABC for mypy compatibility The flush_traces() function calls get_trace_provider().force_flush(), but TraceProvider (the abstract base class) did not declare force_flush as an abstract method. mypy correctly flagged this as attr-defined error since only DefaultTraceProvider had the concrete implementation. Co-Authored-By: Claude Opus 4.6 --- src/agents/tracing/provider.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/agents/tracing/provider.py b/src/agents/tracing/provider.py index 7f3c1652ae..5e32c84e96 100644 --- a/src/agents/tracing/provider.py +++ b/src/agents/tracing/provider.py @@ -188,6 +188,10 @@ def create_span( ) -> Span[TSpanData]: """Create a new span.""" + @abstractmethod + def force_flush(self) -> None: + """Force all registered processors to flush their buffers immediately.""" + @abstractmethod def shutdown(self) -> None: """Clean up any resources used by the provider.""" From 1ed604d6a12ee6259094434f60f5037fc38d2c73 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Fri, 20 Mar 2026 07:32:39 -0700 Subject: [PATCH 3/6] fix(tracing): make force_flush/shutdown non-abstract for backward compatibility Addresses Codex review feedback: existing TraceProvider subclasses that don't override force_flush or shutdown will no longer break at instantiation. Both methods now default to no-ops on the base class. Adds a test proving a minimal custom provider works with flush_traces(). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/agents/tracing/provider.py | 15 ++++++++--- tests/test_trace_processor.py | 49 ++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/agents/tracing/provider.py b/src/agents/tracing/provider.py index 5e32c84e96..2d2056567d 100644 --- a/src/agents/tracing/provider.py +++ b/src/agents/tracing/provider.py @@ -188,13 +188,20 @@ def create_span( ) -> Span[TSpanData]: """Create a new span.""" - @abstractmethod def force_flush(self) -> None: - """Force all registered processors to flush their buffers immediately.""" + """Force all registered processors to flush their buffers immediately. + + The default implementation is a no-op so that existing + ``TraceProvider`` subclasses continue to work without modification. + Override this in your provider if you need custom flush behaviour. + """ - @abstractmethod def shutdown(self) -> None: - """Clean up any resources used by the provider.""" + """Clean up any resources used by the provider. + + The default implementation is a no-op for the same backward- + compatibility reasons as :meth:`force_flush`. + """ class DefaultTraceProvider(TraceProvider): diff --git a/tests/test_trace_processor.py b/tests/test_trace_processor.py index 82b6405946..055dba87a7 100644 --- a/tests/test_trace_processor.py +++ b/tests/test_trace_processor.py @@ -856,3 +856,52 @@ def test_flush_traces_importable_from_agents(): from agents import flush_traces assert callable(flush_traces) + + +def test_flush_traces_tolerates_provider_without_override(): + """Test that flush_traces() is safe with a TraceProvider that does not override force_flush.""" + from unittest.mock import patch + + from agents.tracing import flush_traces + from agents.tracing.provider import TraceProvider + + class MinimalProvider(TraceProvider): + """A provider that only implements the required abstract methods.""" + + def register_processor(self, processor): + pass + + def set_processors(self, processors): + pass + + def get_current_trace(self): + return None + + def get_current_span(self): + return None + + def set_disabled(self, disabled): + pass + + def time_iso(self): + return "" + + def gen_trace_id(self): + return "t" + + def gen_span_id(self): + return "s" + + def gen_group_id(self): + return "g" + + def create_trace(self, name, **kwargs): + raise NotImplementedError + + def create_span(self, span_data, **kwargs): + raise NotImplementedError + + provider = MinimalProvider() + with patch("agents.tracing.get_trace_provider", return_value=provider): + # Should not raise - force_flush has a default no-op implementation + flush_traces() From 25377a406ab6f4a7ade7acc70448ce17d3bcea05 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Fri, 20 Mar 2026 09:55:44 -0700 Subject: [PATCH 4/6] fix(tracing): guard flush_traces against providers lacking force_flush Add hasattr check so flush_traces() is safe to call even with custom TraceProvider implementations that predate the force_flush method. --- src/agents/tracing/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/agents/tracing/__init__.py b/src/agents/tracing/__init__.py index 47a3cc2643..4f4ee4a88e 100644 --- a/src/agents/tracing/__init__.py +++ b/src/agents/tracing/__init__.py @@ -118,4 +118,6 @@ def flush_traces() -> None: (Celery, FastAPI background tasks, RQ, etc.) to ensure traces are exported to the backend rather than remaining buffered indefinitely. """ - get_trace_provider().force_flush() + provider = get_trace_provider() + if hasattr(provider, "force_flush"): + provider.force_flush() From afbc8afeaf7e4af95bde91528bb82924b645c492 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Fri, 20 Mar 2026 15:43:24 -0700 Subject: [PATCH 5/6] fix: respect disabled flag in DefaultTraceProvider.force_flush force_flush() now checks the disabled flag before flushing processors, matching the behavior of shutdown(). Prevents buffered telemetry from being exported after set_tracing_disabled(True). Co-Authored-By: Claude Opus 4.6 --- src/agents/tracing/provider.py | 3 +++ tests/test_trace_processor.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/src/agents/tracing/provider.py b/src/agents/tracing/provider.py index 2d2056567d..f332c31eb1 100644 --- a/src/agents/tracing/provider.py +++ b/src/agents/tracing/provider.py @@ -378,6 +378,9 @@ def create_span( def force_flush(self) -> None: """Force all processors to flush their buffers immediately.""" + self._refresh_disabled_flag() + if self._disabled: + return self._multi_processor.force_flush() def shutdown(self) -> None: diff --git a/tests/test_trace_processor.py b/tests/test_trace_processor.py index 055dba87a7..5d92be5c4f 100644 --- a/tests/test_trace_processor.py +++ b/tests/test_trace_processor.py @@ -905,3 +905,19 @@ def create_span(self, span_data, **kwargs): with patch("agents.tracing.get_trace_provider", return_value=provider): # Should not raise - force_flush has a default no-op implementation flush_traces() + + +def test_force_flush_respects_disabled_flag(): + """Test that force_flush() skips processing when tracing is disabled.""" + from unittest.mock import MagicMock + + from agents.tracing.provider import DefaultTraceProvider + + provider = DefaultTraceProvider() + mock_processor = MagicMock() + provider.register_processor(mock_processor) + + provider.set_disabled(True) + provider.force_flush() + + mock_processor.force_flush.assert_not_called() From 21d84de5124a99a57af1fa09af1e9ae2df07c16d Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Fri, 20 Mar 2026 17:30:14 -0700 Subject: [PATCH 6/6] fix: resolve lint B027 and mypy override errors Add explicit `pass` to TraceProvider.force_flush/shutdown to satisfy ruff B027 (empty method in abstract class without @abstractmethod). Expand MinimalProvider test stub signatures to match the parent class. Co-Authored-By: Claude Opus 4.6 --- src/agents/tracing/provider.py | 2 ++ tests/test_trace_processor.py | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/agents/tracing/provider.py b/src/agents/tracing/provider.py index f332c31eb1..835e78a978 100644 --- a/src/agents/tracing/provider.py +++ b/src/agents/tracing/provider.py @@ -195,6 +195,7 @@ def force_flush(self) -> None: ``TraceProvider`` subclasses continue to work without modification. Override this in your provider if you need custom flush behaviour. """ + pass def shutdown(self) -> None: """Clean up any resources used by the provider. @@ -202,6 +203,7 @@ def shutdown(self) -> None: The default implementation is a no-op for the same backward- compatibility reasons as :meth:`force_flush`. """ + pass class DefaultTraceProvider(TraceProvider): diff --git a/tests/test_trace_processor.py b/tests/test_trace_processor.py index 5d92be5c4f..d937c03b95 100644 --- a/tests/test_trace_processor.py +++ b/tests/test_trace_processor.py @@ -895,10 +895,24 @@ def gen_span_id(self): def gen_group_id(self): return "g" - def create_trace(self, name, **kwargs): + def create_trace( # type: ignore[override] + self, + name: str, + trace_id: str | None = None, + group_id: str | None = None, + metadata: dict[str, Any] | None = None, + disabled: bool = False, + tracing: Any = None, + ) -> Any: raise NotImplementedError - def create_span(self, span_data, **kwargs): + def create_span( # type: ignore[override] + self, + span_data: Any, + span_id: str | None = None, + parent: Any = None, + disabled: bool = False, + ) -> Any: raise NotImplementedError provider = MinimalProvider()