diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 947eca3806..baa67fbd9d 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -15,10 +15,15 @@ class SpanBatcher(Batcher["StreamedSpan"]): - # TODO[span-first]: size-based flushes - # TODO[span-first]: adjust flush/drop defaults + # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is + # a bit of a buffer for spans that appear between setting the flush event + # and actually flushing the buffer. + # + # The max limits are all per trace. + MAX_ENVELOPE_SIZE = 1000 # spans MAX_BEFORE_FLUSH = 1000 - MAX_BEFORE_DROP = 5000 + MAX_BEFORE_DROP = 2000 + MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB FLUSH_WAIT_TIME = 5.0 TYPE = "span" @@ -35,6 +40,7 @@ def __init__( # envelope. # trace_id -> span buffer self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) + self._running_size: dict[str, int] = defaultdict(lambda: 0) self._capture_func = capture_func self._record_lost_func = record_lost_func self._running = True @@ -45,16 +51,12 @@ def __init__( self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None - def get_size(self) -> int: - # caller is responsible for locking before checking this - return sum(len(buffer) for buffer in self._span_buffer.values()) - def add(self, span: "StreamedSpan") -> None: if not self._ensure_thread() or self._flusher is None: return None with self._lock: - size = self.get_size() + size = len(self._span_buffer[span.trace_id]) if size >= self.MAX_BEFORE_DROP: self._record_lost_func( reason="queue_overflow", @@ -64,18 +66,36 @@ def add(self, span: "StreamedSpan") -> None: return None self._span_buffer[span.trace_id].append(span) + self._running_size[span.trace_id] += self._estimate_size(span) + if size + 1 >= self.MAX_BEFORE_FLUSH: self._flush_event.set() + return + + if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH: + self._flush_event.set() + return + + @staticmethod + def _estimate_size(item: "StreamedSpan") -> int: + # Rough estimate of serialized span size that's quick to compute. + # 210 is the rough size of the payload without attributes, and we + # estimate additional 70 bytes on top of that per attribute. + return 210 + 70 * len(item._attributes) @staticmethod def _to_transport_format(item: "StreamedSpan") -> "Any": # TODO[span-first] res: "dict[str, Any]" = { + "trace_id": item.trace_id, "span_id": item.span_id, "name": item._name, "status": item._status, } + if item._parent_span_id: + res["parent_span_id"] = item._parent_span_id + if item._attributes: res["attributes"] = { k: serialize_attribute(v) for (k, v) in item._attributes.items() @@ -86,7 +106,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any": def _flush(self) -> None: with self._lock: if len(self._span_buffer) == 0: - return None + return envelopes = [] for trace_id, spans in self._span_buffer.items(): @@ -95,34 +115,40 @@ def _flush(self) -> None: # dsc = spans[0].dynamic_sampling_context() dsc = None - envelope = Envelope( - headers={ - "sent_at": format_timestamp(datetime.now(timezone.utc)), - "trace": dsc, - } - ) - - envelope.add_item( - Item( - type="span", - content_type="application/vnd.sentry.items.span.v2+json", + # Max per envelope is 1000, so if we happen to have more than + # 1000 spans in one bucket, we'll need to separate them. + for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE): + end = min(start + self.MAX_ENVELOPE_SIZE, len(spans)) + + envelope = Envelope( headers={ - "item_count": len(spans), - }, - payload=PayloadRef( - json={ - "items": [ - self._to_transport_format(span) - for span in spans - ] - } - ), + "sent_at": format_timestamp(datetime.now(timezone.utc)), + "trace": dsc, + } + ) + + envelope.add_item( + Item( + type=self.TYPE, + content_type=self.CONTENT_TYPE, + headers={ + "item_count": end - start, + }, + payload=PayloadRef( + json={ + "items": [ + self._to_transport_format(spans[j]) + for j in range(start, end) + ] + } + ), + ) ) - ) - envelopes.append(envelope) + envelopes.append(envelope) self._span_buffer.clear() + self._running_size.clear() for envelope in envelopes: self._capture_func(envelope) diff --git a/sentry_sdk/traces.py b/sentry_sdk/traces.py index 71257f7ab3..08d04704a2 100644 --- a/sentry_sdk/traces.py +++ b/sentry_sdk/traces.py @@ -14,9 +14,12 @@ from sentry_sdk.utils import format_attribute, logger if TYPE_CHECKING: - from typing import Any, Optional, Union + from typing import Any, Callable, Optional, ParamSpec, TypeVar, Union from sentry_sdk._types import Attributes, AttributeValue + P = ParamSpec("P") + R = TypeVar("R") + class SpanStatus(str, Enum): OK = "ok" @@ -235,6 +238,14 @@ def __repr__(self) -> str: f"active={self._active})>" ) + def __enter__(self) -> "StreamedSpan": + return self + + def __exit__( + self, ty: "Optional[Any]", value: "Optional[Any]", tb: "Optional[Any]" + ) -> None: + pass + def get_attributes(self) -> "Attributes": return self._attributes @@ -306,6 +317,14 @@ def __init__(self) -> None: def __repr__(self) -> str: return f"<{self.__class__.__name__}(sampled={self.sampled})>" + def __enter__(self) -> "NoOpStreamedSpan": + return self + + def __exit__( + self, ty: "Optional[Any]", value: "Optional[Any]", tb: "Optional[Any]" + ) -> None: + pass + def get_attributes(self) -> "Attributes": return {} @@ -349,3 +368,74 @@ def trace_id(self) -> str: @property def sampled(self) -> "Optional[bool]": return False + + +def trace( + func: "Optional[Callable[P, R]]" = None, + *, + name: "Optional[str]" = None, + attributes: "Optional[dict[str, Any]]" = None, + active: bool = True, +) -> "Union[Callable[P, R], Callable[[Callable[P, R]], Callable[P, R]]]": + """ + Decorator to start a span around a function call. + + This decorator automatically creates a new span when the decorated function + is called, and finishes the span when the function returns or raises an exception. + + :param func: The function to trace. When used as a decorator without parentheses, + this is the function being decorated. When used with parameters (e.g., + ``@trace(op="custom")``, this should be None. + :type func: Callable or None + + :param name: The human-readable name/description for the span. If not provided, + defaults to the function name. This provides more specific details about + what the span represents (e.g., "GET /api/users", "process_user_data"). + :type name: str or None + + :param attributes: A dictionary of key-value pairs to add as attributes to the span. + Attribute values must be strings, integers, floats, or booleans. These + attributes provide additional context about the span's execution. + :type attributes: dict[str, Any] or None + + :param active: Controls whether spans started while this span is running + will automatically become its children. That's the default behavior. If + you want to create a span that shouldn't have any children (unless + provided explicitly via the `parent_span` argument), set this to False. + :type active: bool + + :returns: When used as ``@trace``, returns the decorated function. When used as + ``@trace(...)`` with parameters, returns a decorator function. + :rtype: Callable or decorator function + + Example:: + + import sentry_sdk + + # Simple usage with default values + @sentry_sdk.trace + def process_data(): + # Function implementation + pass + + # With custom parameters + @sentry_sdk.trace( + name="Get user data", + attributes={"postgres": True} + ) + def make_db_query(sql): + # Function implementation + pass + """ + from sentry_sdk.tracing_utils import create_streaming_span_decorator + + decorator = create_streaming_span_decorator( + name=name, + attributes=attributes, + active=active, + ) + + if func: + return decorator(func) + else: + return decorator diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index c1d6c44535..54c3dcc6f5 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -942,6 +942,57 @@ def sync_wrapper(*args: "Any", **kwargs: "Any") -> "Any": return span_decorator +def create_streaming_span_decorator( + name: "Optional[str]" = None, + attributes: "Optional[dict[str, Any]]" = None, + active: bool = True, +) -> "Any": + """ + Create a span creating decorator that can wrap both sync and async functions. + """ + + def span_decorator(f: "Any") -> "Any": + """ + Decorator to create a span for the given function. + """ + + @functools.wraps(f) + async def async_wrapper(*args: "Any", **kwargs: "Any") -> "Any": + span_name = name or qualname_from_function(f) or "" + + with start_streaming_span( + name=span_name, attributes=attributes, active=active + ): + result = await f(*args, **kwargs) + return result + + try: + async_wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined] + except Exception: + pass + + @functools.wraps(f) + def sync_wrapper(*args: "Any", **kwargs: "Any") -> "Any": + span_name = name or qualname_from_function(f) or "" + + with start_streaming_span( + name=span_name, attributes=attributes, active=active + ): + return f(*args, **kwargs) + + try: + sync_wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined] + except Exception: + pass + + if inspect.iscoroutinefunction(f): + return async_wrapper + else: + return sync_wrapper + + return span_decorator + + def get_current_span(scope: "Optional[sentry_sdk.Scope]" = None) -> "Optional[Span]": """ Returns the currently active span if there is one running, otherwise `None` @@ -1317,6 +1368,9 @@ def add_sentry_baggage_to_headers( LOW_QUALITY_TRANSACTION_SOURCES, SENTRY_TRACE_HEADER_NAME, ) +from sentry_sdk.traces import ( + start_span as start_streaming_span, +) if TYPE_CHECKING: from sentry_sdk.tracing import Span