From 5d41d8e1e9f1aa09e071837b13ab44fefac4aaa0 Mon Sep 17 00:00:00 2001 From: akash-vijay-kv Date: Thu, 5 Mar 2026 10:11:42 +0530 Subject: [PATCH 1/5] [NET-333] feat: Add SDK support for prompt management feature --- netra/__init__.py | 11 +++- netra/prompts/__init__.py | 3 + netra/prompts/api.py | 40 +++++++++++ netra/prompts/client.py | 135 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 netra/prompts/__init__.py create mode 100644 netra/prompts/api.py create mode 100644 netra/prompts/client.py diff --git a/netra/__init__.py b/netra/__init__.py index 6989f8f..033f973 100644 --- a/netra/__init__.py +++ b/netra/__init__.py @@ -13,6 +13,7 @@ from netra.instrumentation import init_instrumentations from netra.instrumentation.instruments import NetraInstruments from netra.logging_utils import configure_package_logging +from netra.prompts import Prompts from netra.session_manager import ConversationType, SessionManager from netra.simulation import Simulation from netra.span_wrapper import ActionModel, SpanType, SpanWrapper, UsageModel @@ -23,6 +24,7 @@ "Netra", "UsageModel", "ActionModel", + "Prompts", ] logger = logging.getLogger(__name__) @@ -133,6 +135,13 @@ def init( logger.warning("Failed to initialize dashboard client: %s", e, exc_info=True) cls.dashboard = None # type:ignore[attr-defined] + # Initialize prompts client and expose as class attribute + try: + cls.prompts = Prompts(cfg) # type:ignore[attr-defined] + except Exception as e: + logger.warning("Failed to initialize prompts client: %s", e, exc_info=True) + cls.prompts = None # type:ignore[attr-defined] + # Initialize simulation client and expose as class attribute try: cls.simulation = Simulation(cfg) # type:ignore[attr-defined] @@ -313,4 +322,4 @@ def start_span( return SpanWrapper(name, attributes, module_name, as_type=as_type) -__all__ = ["Netra", "UsageModel", "ActionModel", "SpanType", "EvaluationScore"] +__all__ = ["Netra", "UsageModel", "ActionModel", "SpanType", "EvaluationScore", "Prompts"] diff --git a/netra/prompts/__init__.py b/netra/prompts/__init__.py new file mode 100644 index 0000000..33e400a --- /dev/null +++ b/netra/prompts/__init__.py @@ -0,0 +1,3 @@ +from netra.prompts.api import Prompts + +__all__ = ["Prompts"] diff --git a/netra/prompts/api.py b/netra/prompts/api.py new file mode 100644 index 0000000..c85ed71 --- /dev/null +++ b/netra/prompts/api.py @@ -0,0 +1,40 @@ +import logging +from typing import Any + +from netra.config import Config +from netra.prompts.client import PromptsHttpClient + +logger = logging.getLogger(__name__) + + +class Prompts: + """ + Public entry-point exposed as Netra.prompts + """ + + def __init__(self, cfg: Config) -> None: + """ + Initialize the Prompts client. + + Args: + cfg: Configuration object containing API key and base URL + """ + self._config = cfg + self._client = PromptsHttpClient(cfg) + + def get_prompt(self, name: str, label: str = "production") -> Any: + """ + Fetch a prompt version by name and label. + + Args: + name: Name of the prompt + label: Label of the prompt version (default: "production") + + Returns: + Prompt version data or empty dict if not found + """ + if not name: + logger.error("netra.prompts: name is required to fetch a prompt") + return None + + return self._client.get_prompt_version(prompt_name=name, label=label) diff --git a/netra/prompts/client.py b/netra/prompts/client.py new file mode 100644 index 0000000..f23de36 --- /dev/null +++ b/netra/prompts/client.py @@ -0,0 +1,135 @@ +import logging +import os +from typing import Any, Dict, Optional + +import httpx + +from netra.config import Config + +logger = logging.getLogger(__name__) + + +class PromptsHttpClient: + """ + Internal HTTP client for prompts APIs. + """ + + def __init__(self, config: Config) -> None: + """ + Initialize the prompts HTTP client. + + Args: + config: Configuration object containing API key and base URL + """ + self._client: Optional[httpx.Client] = self._create_client(config) + + def _create_client(self, config: Config) -> Optional[httpx.Client]: + """ + Create and configure the HTTP client. + + Args: + config: Configuration object containing API key and base URL + + Returns: + Configured HTTP client or None if initialization fails + """ + endpoint = (config.otlp_endpoint or "").strip() + if not endpoint: + logger.error("netra.prompts: NETRA_OTLP_ENDPOINT is required for prompts APIs") + return None + + base_url = self._resolve_base_url(endpoint) + headers = self._build_headers(config) + timeout = self._get_timeout() + + try: + return httpx.Client(base_url=base_url, headers=headers, timeout=timeout) + except Exception as exc: + logger.error("netra.prompts: Failed to initialize prompts HTTP client: %s", exc) + return None + + def _resolve_base_url(self, endpoint: str) -> str: + """ + Resolve the base URL by removing /telemetry suffix if present. + + Args: + endpoint: The endpoint URL + + Returns: + Resolved base URL + """ + base_url = endpoint.rstrip("/") + if base_url.endswith("/telemetry"): + base_url = base_url[: -len("/telemetry")] + return base_url + + def _build_headers(self, config: Config) -> Dict[str, str]: + """ + Build HTTP headers for API requests. + + Args: + config: Configuration object containing API key and base URL + + Returns: + Dictionary of HTTP headers + """ + headers: Dict[str, str] = dict(config.headers or {}) + api_key = config.api_key + if api_key: + headers["x-api-key"] = api_key + return headers + + def _get_timeout(self) -> float: + """ + Get the timeout value from environment variable or use default. + + Returns: + Timeout value in seconds + """ + timeout_env = os.getenv("NETRA_PROMPTS_TIMEOUT") + if not timeout_env: + return 10.0 + try: + return float(timeout_env) + except ValueError: + logger.warning( + "netra.prompts: Invalid NETRA_PROMPTS_TIMEOUT value '%s', using default 10.0", + timeout_env, + ) + return 10.0 + + def get_prompt_version(self, prompt_name: str, label: str) -> Any: + """ + Fetch a prompt version by name and label. + + Args: + prompt_name: Name of the prompt + label: Label of the prompt version + + Returns: + Prompt version data or empty dict if not found + """ + if not self._client: + logger.error( + "netra.prompts: Prompts client is not initialized; cannot fetch prompt version for '%s'", + prompt_name, + ) + return {} + + try: + url = "/sdk/prompts/version" + payload: Dict[str, Any] = {"promptName": prompt_name, "label": label} + response = self._client.post(url, json=payload) + response.raise_for_status() + data = response.json() + if isinstance(data, dict) and "data" in data: + return data.get("data", {}) + return data + except Exception as exc: + logger.error( + "netra.prompts: Failed to fetch prompt version for '%s' (label=%s): %s", + prompt_name, + label, + exc, + ) + return {} From 953d34c259726bf63ca7d2434ca411683f9386e4 Mon Sep 17 00:00:00 2001 From: akash-vijay-kv Date: Fri, 13 Mar 2026 13:38:49 +0530 Subject: [PATCH 2/5] chore: Update version and changelog --- CHANGELOG.md | 7 ++++++- netra/version.py | 2 +- pyproject.toml | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b35a0a4..d18336d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on Keep a Changelog and this project adheres to Semantic Versioning. +## [0.1.74] - 2026-10-14 + +- Add utility for prompt management + + ## [0.1.73] - 2026-10-14 - Extended dependency support for opentelemetry and traceloop-sdk @@ -195,4 +200,4 @@ The format is based on Keep a Changelog and this project adheres to Semantic Ver - Added utility to set input and output data for any active span in a trace -[0.1.73]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main +[0.1.74]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main diff --git a/netra/version.py b/netra/version.py index aaedf09..a52a5aa 100644 --- a/netra/version.py +++ b/netra/version.py @@ -1 +1 @@ -__version__ = "0.1.73" +__version__ = "0.1.74" diff --git a/pyproject.toml b/pyproject.toml index b093288..e902228 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [project] name = "netra-sdk" -version = "0.1.73" +version = "0.1.74" description = "A Python SDK for AI application observability that provides OpenTelemetry-based monitoring, tracing, and PII protection for LLM and vector database applications. Enables easy instrumentation, session tracking, and privacy-focused data collection for AI systems in production environments." authors = [ {name = "Sooraj Thomas",email = "sooraj@keyvalue.systems"} From 05adcfc102fc763ba1cb668fa4e680cfb85043f6 Mon Sep 17 00:00:00 2001 From: Akash Vijay Date: Wed, 18 Mar 2026 16:21:52 +0530 Subject: [PATCH 3/5] [NET-424] fix: Add proxy support for context manager protocol in openai instrumentation (#221) --- netra/instrumentation/litellm/wrappers.py | 26 +++++++++++++++++++++++ netra/instrumentation/openai/wrappers.py | 26 +++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/netra/instrumentation/litellm/wrappers.py b/netra/instrumentation/litellm/wrappers.py index 9e4f944..4a1e93e 100644 --- a/netra/instrumentation/litellm/wrappers.py +++ b/netra/instrumentation/litellm/wrappers.py @@ -358,6 +358,19 @@ def _ensure_choice(self, index: int) -> None: else: self._complete_response["choices"].append({"text": ""}) + def __enter__(self) -> "StreamingWrapper": + if hasattr(self.__wrapped__, "__enter__"): + self.__wrapped__.__enter__() + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + if hasattr(self.__wrapped__, "__exit__"): + self.__wrapped__.__exit__(exc_type, exc_val, exc_tb) + if exc_type is not None: + self._span.set_status(Status(StatusCode.ERROR, str(exc_val))) + self._span.record_exception(exc_val) + self._span.end() + def __iter__(self) -> Iterator[Any]: return self @@ -451,6 +464,19 @@ def _ensure_choice(self, index: int) -> None: else: self._complete_response["choices"].append({"text": ""}) + async def __aenter__(self) -> "AsyncStreamingWrapper": + if hasattr(self.__wrapped__, "__aenter__"): + await self.__wrapped__.__aenter__() + return self + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + if hasattr(self.__wrapped__, "__aexit__"): + await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb) + if exc_type is not None: + self._span.set_status(Status(StatusCode.ERROR, str(exc_val))) + self._span.record_exception(exc_val) + self._span.end() + def __aiter__(self) -> AsyncIterator[Any]: return self diff --git a/netra/instrumentation/openai/wrappers.py b/netra/instrumentation/openai/wrappers.py index a26d3f2..971d98e 100644 --- a/netra/instrumentation/openai/wrappers.py +++ b/netra/instrumentation/openai/wrappers.py @@ -307,6 +307,19 @@ def _ensure_choice(self, index: int) -> None: else: self._complete_response["choices"].append({"text": ""}) + def __enter__(self) -> "StreamingWrapper": + if hasattr(self.__wrapped__, "__enter__"): + self.__wrapped__.__enter__() + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + if hasattr(self.__wrapped__, "__exit__"): + self.__wrapped__.__exit__(exc_type, exc_val, exc_tb) + if exc_type is not None: + self._span.set_status(Status(StatusCode.ERROR, str(exc_val))) + self._span.record_exception(exc_val) + self._span.end() + def __iter__(self) -> Iterator[Any]: return self @@ -409,6 +422,19 @@ def _ensure_choice(self, index: int) -> None: else: self._complete_response["choices"].append({"text": ""}) + async def __aenter__(self) -> "AsyncStreamingWrapper": + if hasattr(self.__wrapped__, "__aenter__"): + await self.__wrapped__.__aenter__() + return self + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + if hasattr(self.__wrapped__, "__aexit__"): + await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb) + if exc_type is not None: + self._span.set_status(Status(StatusCode.ERROR, str(exc_val))) + self._span.record_exception(exc_val) + self._span.end() + def __aiter__(self) -> AsyncIterator[Any]: return self From 360ee20b05cc761334ac0bc74fb8c0b9e2d66f1d Mon Sep 17 00:00:00 2001 From: akash-vijay-kv Date: Wed, 18 Mar 2026 17:58:37 +0530 Subject: [PATCH 4/5] [NET-447] feat: Add root instrument filter processor and default instruments --- netra/__init__.py | 20 +- netra/instrumentation/__init__.py | 22 +- netra/instrumentation/instruments.py | 50 +++++ netra/processors/__init__.py | 2 + .../root_instrument_filter_processor.py | 205 ++++++++++++++++++ netra/tracer.py | 13 +- 6 files changed, 296 insertions(+), 16 deletions(-) create mode 100644 netra/processors/root_instrument_filter_processor.py diff --git a/netra/__init__.py b/netra/__init__.py index 033f973..3d8abb5 100644 --- a/netra/__init__.py +++ b/netra/__init__.py @@ -11,7 +11,7 @@ from netra.dashboard import Dashboard from netra.evaluation import Evaluation from netra.instrumentation import init_instrumentations -from netra.instrumentation.instruments import NetraInstruments +from netra.instrumentation.instruments import DEFAULT_INSTRUMENTS_FOR_ROOT, NetraInstruments from netra.logging_utils import configure_package_logging from netra.prompts import Prompts from netra.session_manager import ConversationType, SessionManager @@ -68,6 +68,7 @@ def init( blocked_spans: Optional[List[str]] = None, instruments: Optional[Set[NetraInstruments]] = None, block_instruments: Optional[Set[NetraInstruments]] = None, + root_instruments: Optional[Set[NetraInstruments]] = None, ) -> None: """ Thread-safe initialization of Netra. @@ -85,6 +86,12 @@ def init( blocked_spans: List of spans to be blocked instruments: Set of instruments to be enabled block_instruments: Set of instruments to be blocked + root_instruments: Set of instruments allowed to produce root-level + spans. When a root span is blocked, its entire subtree is + discarded. Resolution priority: + 1. Explicit ``root_instruments`` value if provided. + 2. The ``instruments`` value if provided (but ``root_instruments`` is not). + 3. ``DEFAULT_INSTRUMENTS_FOR_ROOT`` if neither is provided. Returns: None @@ -111,8 +118,17 @@ def init( # Configure logging based on debug mode configure_package_logging(debug_mode=cfg.debug_mode) + # Resolve root_instruments → set of instrumentation-name strings. + resolved_root: Optional[Set[str]] = None + if root_instruments is not None: + resolved_root = {m.value for m in root_instruments} + elif instruments is not None: + resolved_root = {m.value for m in instruments} + else: + resolved_root = {m.value for m in DEFAULT_INSTRUMENTS_FOR_ROOT} + # Initialize tracer (OTLP exporter, span processor, resource) - Tracer(cfg) + Tracer(cfg, root_instrument_names=resolved_root) # Initialize evaluation client and expose as class attribute try: diff --git a/netra/instrumentation/__init__.py b/netra/instrumentation/__init__.py index 49f27ae..7ff379b 100644 --- a/netra/instrumentation/__init__.py +++ b/netra/instrumentation/__init__.py @@ -7,7 +7,7 @@ from traceloop.sdk import Instruments, Telemetry from traceloop.sdk.utils.package_check import is_package_installed -from netra.instrumentation.instruments import CustomInstruments, NetraInstruments +from netra.instrumentation.instruments import DEFAULT_INSTRUMENTS, CustomInstruments, NetraInstruments def init_instrumentations( @@ -18,36 +18,34 @@ def init_instrumentations( ) -> None: from traceloop.sdk.tracing.tracing import init_instrumentations + # When the user does not pass instruments, fall back to the curated default set. + resolved_instruments = instruments if instruments is not None else DEFAULT_INSTRUMENTS + traceloop_instruments = set() traceloop_block_instruments = set() netra_custom_instruments = set() netra_custom_block_instruments = set() - if instruments: - for instrument in instruments: - if instrument.origin == CustomInstruments: # type: ignore[attr-defined] + if resolved_instruments: + for instrument in resolved_instruments: + if isinstance(instrument, CustomInstruments): netra_custom_instruments.add(getattr(CustomInstruments, instrument.name)) else: traceloop_instruments.add(getattr(Instruments, instrument.name)) if block_instruments: for instrument in block_instruments: - if instrument.origin == CustomInstruments: # type: ignore[attr-defined] + if isinstance(instrument, CustomInstruments): netra_custom_block_instruments.add(getattr(CustomInstruments, instrument.name)) else: traceloop_block_instruments.add(getattr(Instruments, instrument.name)) # If no instruments in traceloop are provided for instrumentation - if instruments and not traceloop_instruments and not traceloop_block_instruments: + if resolved_instruments and not traceloop_instruments and not traceloop_block_instruments: traceloop_block_instruments = set(Instruments) # If no custom instruments in netra are provided for instrumentation - if instruments and not netra_custom_instruments and not netra_custom_block_instruments: + if resolved_instruments and not netra_custom_instruments and not netra_custom_block_instruments: netra_custom_block_instruments = set(CustomInstruments) - # If no instruments are provided for instrumentation, instrument all instruments - if not instruments and not block_instruments: - traceloop_instruments = set(Instruments) - netra_custom_instruments = set(CustomInstruments) - netra_custom_instruments = netra_custom_instruments - netra_custom_block_instruments traceloop_instruments = traceloop_instruments - traceloop_block_instruments if not traceloop_instruments: diff --git a/netra/instrumentation/instruments.py b/netra/instrumentation/instruments.py index 1db7ea6..2bcbd54 100644 --- a/netra/instrumentation/instruments.py +++ b/netra/instrumentation/instruments.py @@ -94,6 +94,56 @@ def __new__(cls: Any, value: Any, origin: Any = None) -> Any: InstrumentSet = NetraInstruments("InstrumentSet", merged_members) +# Curated default instrument set used for root_instruments when the user does +# not pass an explicit value. Covers core LLM/AI providers and frameworks. +DEFAULT_INSTRUMENTS_FOR_ROOT = { + InstrumentSet.ANTHROPIC, # type:ignore[attr-defined] + InstrumentSet.CARTESIA, # type:ignore[attr-defined] + InstrumentSet.COHEREAI, # type:ignore[attr-defined] + InstrumentSet.CREWAI, # type:ignore[attr-defined] + InstrumentSet.DEEPGRAM, # type:ignore[attr-defined] + InstrumentSet.ELEVENLABS, # type:ignore[attr-defined] + InstrumentSet.GOOGLE_GENERATIVEAI, # type:ignore[attr-defined] + InstrumentSet.ADK, # type:ignore[attr-defined] + InstrumentSet.GROQ, # type:ignore[attr-defined] + InstrumentSet.LANGCHAIN, # type:ignore[attr-defined] + InstrumentSet.LITELLM, # type:ignore[attr-defined] + InstrumentSet.CEREBRAS, # type:ignore[attr-defined] + InstrumentSet.MISTRALAI, # type:ignore[attr-defined] + InstrumentSet.OPENAI, # type:ignore[attr-defined] + InstrumentSet.OLLAMA, # type:ignore[attr-defined] + InstrumentSet.VERTEXAI, # type:ignore[attr-defined] + InstrumentSet.LLAMA_INDEX, # type:ignore[attr-defined] + InstrumentSet.PYDANTIC_AI, # type:ignore[attr-defined] + InstrumentSet.DSPY, # type:ignore[attr-defined] + InstrumentSet.HAYSTACK, # type:ignore[attr-defined] + InstrumentSet.BEDROCK, # type:ignore[attr-defined] + InstrumentSet.TOGETHER, # type:ignore[attr-defined] + InstrumentSet.REPLICATE, # type:ignore[attr-defined] + InstrumentSet.ALEPHALPHA, # type:ignore[attr-defined] + InstrumentSet.WATSONX, # type:ignore[attr-defined] +} + +# Broader default instrument set used for the ``instruments`` parameter when +# the user does not pass an explicit value. Includes the root defaults plus +# common vector DBs, HTTP client/server, and database ORM/client libraries. +DEFAULT_INSTRUMENTS = DEFAULT_INSTRUMENTS_FOR_ROOT.union( + { + InstrumentSet.PINECONE, # type:ignore[attr-defined] + InstrumentSet.CHROMA, # type:ignore[attr-defined] + InstrumentSet.WEAVIATEDB, # type:ignore[attr-defined] + InstrumentSet.QDRANTDB, # type:ignore[attr-defined] + InstrumentSet.MILVUS, # type:ignore[attr-defined] + InstrumentSet.LANCEDB, # type:ignore[attr-defined] + InstrumentSet.MARQO, # type:ignore[attr-defined] + InstrumentSet.PYMYSQL, # type:ignore[attr-defined] + InstrumentSet.REQUESTS, # type:ignore[attr-defined] + InstrumentSet.SQLALCHEMY, # type:ignore[attr-defined] + InstrumentSet.HTTPX, # type:ignore[attr-defined] + } +) + + ##################################################################################### """ NetraInstruments follows the given structure. Refer this for usage within Netra SDK: diff --git a/netra/processors/__init__.py b/netra/processors/__init__.py index fb3b0ee..7c8e43f 100644 --- a/netra/processors/__init__.py +++ b/netra/processors/__init__.py @@ -1,6 +1,7 @@ from netra.processors.instrumentation_span_processor import InstrumentationSpanProcessor from netra.processors.llm_trace_identifier_span_processor import LlmTraceIdentifierSpanProcessor from netra.processors.local_filtering_span_processor import LocalFilteringSpanProcessor +from netra.processors.root_instrument_filter_processor import RootInstrumentFilterProcessor from netra.processors.scrubbing_span_processor import ScrubbingSpanProcessor from netra.processors.session_span_processor import SessionSpanProcessor @@ -10,4 +11,5 @@ "LlmTraceIdentifierSpanProcessor", "ScrubbingSpanProcessor", "LocalFilteringSpanProcessor", + "RootInstrumentFilterProcessor", ] diff --git a/netra/processors/root_instrument_filter_processor.py b/netra/processors/root_instrument_filter_processor.py new file mode 100644 index 0000000..d506381 --- /dev/null +++ b/netra/processors/root_instrument_filter_processor.py @@ -0,0 +1,205 @@ +import logging +import threading +from typing import Any, Optional, Set + +from opentelemetry import context as otel_context +from opentelemetry import trace +from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor +from opentelemetry.trace import INVALID_SPAN_ID + +logger = logging.getLogger(__name__) + +# Attribute written on blocked spans so that the FilteringSpanExporter drops them. +_LOCAL_BLOCKED_ATTR = "netra.local_blocked" + + +class RootInstrumentFilterProcessor(SpanProcessor): # type: ignore[misc] + """Blocks root spans (and their entire subtree) from instrumentations not in + the allowed *root_instruments* set. + + The set stores the **instrumentation name values** (e.g. ``"openai"``, + ``"adk"``, ``"google_genai"``) that are permitted to create root-level spans. + Any root span whose instrumentation name is *not* in this set is marked with + ``netra.local_blocked = True`` and its ``span_id`` is recorded. Child spans + whose parent ``span_id`` appears in the blocked registry inherit the block. + + Args: + allowed_root_instrument_names: Set of instrumentation-name strings + (matching ``InstrumentSet`` member values) that are allowed to + produce root spans. + """ + + def __init__(self, allowed_root_instrument_names: Set[str]) -> None: + """ + Initialize the processor with a set of allowed root instrument names. + + Args: + allowed_root_instrument_names: Set of instrumentation-name strings + (matching ``InstrumentSet`` member values) that are allowed to + produce root spans. + """ + self._allowed: frozenset[str] = frozenset(allowed_root_instrument_names) + # span_id -> True for every span that belongs to a blocked root tree. + self._blocked_span_ids: dict[int, bool] = {} + self._lock = threading.Lock() + + def on_start( + self, + span: Span, + parent_context: Optional[otel_context.Context] = None, + ) -> None: + """ + Called when a span is started. + + Args: + span: The span that is being started. + parent_context: The parent context of the span. + """ + try: + self._process_span_start(span, parent_context) + except Exception: + logger.debug("RootInstrumentFilterProcessor.on_start failed", exc_info=True) + + def on_end(self, span: ReadableSpan) -> None: + """ + Called when a span is ended. + + Args: + span: The span that is being ended. + """ + try: + span_id = self._get_span_id(span) + if span_id is not None: + with self._lock: + self._blocked_span_ids.pop(span_id, None) + except Exception: + pass + + def shutdown(self) -> None: + """ + Called when the processor is shut down. + """ + with self._lock: + self._blocked_span_ids.clear() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """ + Called when the processor is forced to flush. + + Args: + timeout_millis: The timeout in milliseconds. + + Returns: + True if the flush was successful, False otherwise. + """ + return True + + def _process_span_start( + self, + span: Span, + parent_context: Optional[otel_context.Context], + ) -> None: + """ + Processes the start of a span. + + Args: + span: The span that is being started. + parent_context: The parent context of the span. + """ + parent_span_id = self._resolve_parent_span_id(parent_context) + + if parent_span_id is not None and parent_span_id != INVALID_SPAN_ID: + # This is a child span – inherit blocked status from parent. + with self._lock: + if parent_span_id in self._blocked_span_ids: + own_id = self._get_span_id(span) + if own_id is not None: + self._blocked_span_ids[own_id] = True + self._mark_blocked(span) + return + + # Root span – check instrumentation name against the allow-list. + instr_name = self._extract_instrumentation_name(span) + if instr_name is not None and instr_name not in self._allowed: + own_id = self._get_span_id(span) + if own_id is not None: + with self._lock: + self._blocked_span_ids[own_id] = True + self._mark_blocked(span) + + @staticmethod + def _resolve_parent_span_id( + parent_context: Optional[otel_context.Context], + ) -> Any: + """ + Return the parent span's ``span_id`` from the supplied context, or ``None``. + + Args: + parent_context: The parent context of the span. + + Returns: + The parent span's ``span_id`` or ``None``. + """ + if parent_context is None: + return None + parent_span = trace.get_current_span(parent_context) + if parent_span is None: + return None + sc = parent_span.get_span_context() + if sc is None: + return None + return sc.span_id + + @staticmethod + def _get_span_id(span: object) -> Optional[int]: + """ + Get the span ID from the span. + + Args: + span: The span to get the ID from. + + Returns: + The span ID or None. + """ + ctx = getattr(span, "context", None) or getattr(span, "get_span_context", lambda: None)() + if ctx is None: + return None + return getattr(ctx, "span_id", None) + + @staticmethod + def _mark_blocked(span: Span) -> None: + """ + Mark the span as blocked. + + Args: + span: The span to mark as blocked. + """ + try: + span.set_attribute(_LOCAL_BLOCKED_ATTR, True) + except Exception: + pass + + @staticmethod + def _extract_instrumentation_name(span: Span) -> Optional[str]: + """ + Extract the short instrumentation name from the span's scope. + + Mirrors the logic in ``InstrumentationSpanProcessor._extract_instrumentation_name``. + + Args: + span: The span to extract the instrumentation name from. + + Returns: + The instrumentation name or None. + """ + scope = getattr(span, "instrumentation_scope", None) + if scope is None: + return None + name = getattr(scope, "name", None) + if not isinstance(name, str) or not name: + return None + for prefix in ("opentelemetry.instrumentation.", "netra.instrumentation."): + if name.startswith(prefix): + base = name.rsplit(".", 1)[-1].strip() + return base if base else name + return name diff --git a/netra/tracer.py b/netra/tracer.py index eb3d9c9..1918f32 100644 --- a/netra/tracer.py +++ b/netra/tracer.py @@ -1,6 +1,6 @@ import logging import threading -from typing import Any, Dict +from typing import Any, Dict, Optional, Set from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter @@ -27,13 +27,18 @@ class Tracer: and appropriate span processors. """ - def __init__(self, cfg: Config) -> None: + def __init__(self, cfg: Config, root_instrument_names: Optional[Set[str]] = None) -> None: """Initialize the Netra tracer with the provided configuration. Args: cfg: Configuration object with tracer settings + root_instrument_names: Optional set of instrumentation-name strings + that are allowed to produce root-level spans. When provided, a + ``RootInstrumentFilterProcessor`` is installed that discards root + spans (and their entire subtree) from all other instrumentations. """ self.cfg = cfg + self._root_instrument_names = root_instrument_names self._setup_tracer() def _setup_tracer(self) -> None: @@ -93,10 +98,14 @@ def _setup_tracer(self) -> None: InstrumentationSpanProcessor, LlmTraceIdentifierSpanProcessor, LocalFilteringSpanProcessor, + RootInstrumentFilterProcessor, ScrubbingSpanProcessor, SessionSpanProcessor, ) + if self._root_instrument_names is not None: + provider.add_span_processor(RootInstrumentFilterProcessor(self._root_instrument_names)) + provider.add_span_processor(LocalFilteringSpanProcessor()) provider.add_span_processor(InstrumentationSpanProcessor()) provider.add_span_processor(SessionSpanProcessor()) From 1ec6416ee1e54af69aa6aecb17fbd8618041083f Mon Sep 17 00:00:00 2001 From: akash-vijay-kv Date: Wed, 18 Mar 2026 18:39:49 +0530 Subject: [PATCH 5/5] fix: Update the runtime merging of InstrumentSet with static merging --- netra/instrumentation/__init__.py | 4 +- netra/instrumentation/instruments.py | 171 +++++++++++++----- .../instrumentation_span_processor.py | 2 +- .../root_instrument_filter_processor.py | 8 +- 4 files changed, 133 insertions(+), 52 deletions(-) diff --git a/netra/instrumentation/__init__.py b/netra/instrumentation/__init__.py index 7ff379b..172d288 100644 --- a/netra/instrumentation/__init__.py +++ b/netra/instrumentation/__init__.py @@ -27,13 +27,13 @@ def init_instrumentations( netra_custom_block_instruments = set() if resolved_instruments: for instrument in resolved_instruments: - if isinstance(instrument, CustomInstruments): + if instrument.origin == CustomInstruments: # type: ignore[attr-defined] netra_custom_instruments.add(getattr(CustomInstruments, instrument.name)) else: traceloop_instruments.add(getattr(Instruments, instrument.name)) if block_instruments: for instrument in block_instruments: - if isinstance(instrument, CustomInstruments): + if instrument.origin == CustomInstruments: # type: ignore[attr-defined] netra_custom_block_instruments.add(getattr(CustomInstruments, instrument.name)) else: traceloop_block_instruments.add(getattr(Instruments, instrument.name)) diff --git a/netra/instrumentation/instruments.py b/netra/instrumentation/instruments.py index 2bcbd54..c51cc90 100644 --- a/netra/instrumentation/instruments.py +++ b/netra/instrumentation/instruments.py @@ -73,7 +73,7 @@ class CustomInstruments(Enum): ELEVENLABS = "elevenlabs" -class NetraInstruments(Enum): +class InstrumentSet(Enum): """Custom enum that stores the original enum class in an 'origin' attribute.""" def __new__(cls: Any, value: Any, origin: Any = None) -> Any: @@ -82,46 +82,127 @@ def __new__(cls: Any, value: Any, origin: Any = None) -> Any: member.origin = origin return member + ADK = ("adk", CustomInstruments) + AIOHTTP = ("aiohttp", CustomInstruments) + AIOHTTP_SERVER = ("aiohttp_server", CustomInstruments) + AIO_PIKA = ("aio_pika", CustomInstruments) + AIOKAFKA = ("aiokafka", CustomInstruments) + AIOPG = ("aiopg", CustomInstruments) + ALEPHALPHA = ("alephalpha", Instruments) + ANTHROPIC = ("anthropic", Instruments) + ASGI = ("asgi", CustomInstruments) + ASYNCCLICK = ("asyncclick", CustomInstruments) + ASYNCIO = ("asyncio", CustomInstruments) + ASYNCPG = ("asyncpg", CustomInstruments) + AWS_LAMBDA = ("aws_lambda", CustomInstruments) + BEDROCK = ("bedrock", Instruments) + BOTO = ("boto", CustomInstruments) + BOTO3SQS = ("boto3sqs", CustomInstruments) + BOTOCORE = ("botocore", CustomInstruments) + CARTESIA = ("cartesia", CustomInstruments) + CASSANDRA = ("cassandra", CustomInstruments) + CEREBRAS = ("cerebras", CustomInstruments) + CELERY = ("celery", CustomInstruments) + CHROMA = ("chroma", Instruments) + CLICK = ("click", CustomInstruments) + COHEREAI = ("cohere_ai", CustomInstruments) + CONFLUENT_KAFKA = ("confluent_kafka", CustomInstruments) + CREWAI = ("crewai", Instruments) + DEEPGRAM = ("deepgram", CustomInstruments) + DBAPI = ("dbapi", CustomInstruments) + DJANGO = ("django", CustomInstruments) + DSPY = ("dspy", CustomInstruments) + ELASTICSEARCH = ("elasticsearch", CustomInstruments) + ELEVENLABS = ("elevenlabs", CustomInstruments) + FALCON = ("falcon", CustomInstruments) + FASTAPI = ("fastapi", CustomInstruments) + FLASK = ("flask", CustomInstruments) + GOOGLE_GENERATIVEAI = ("google_genai", CustomInstruments) + GROQ = ("groq", CustomInstruments) + GRPC = ("grpc", CustomInstruments) + HAYSTACK = ("haystack", Instruments) + HTTPX = ("httpx", CustomInstruments) + JINJA2 = ("jinja2", CustomInstruments) + KAFKA_PYTHON = ("kafka_python", CustomInstruments) + LANCEDB = ("lancedb", Instruments) + LANGCHAIN = ("langchain", Instruments) + LITELLM = ("litellm", CustomInstruments) + LLAMA_INDEX = ("llama_index", Instruments) + LOGGING = ("logging", CustomInstruments) + MARQO = ("marqo", Instruments) + MCP = ("mcp", Instruments) + MILVUS = ("milvus", Instruments) + MISTRALAI = ("mistral_ai", CustomInstruments) + MYSQL = ("mysql", CustomInstruments) + MYSQLCLIENT = ("mysqlclient", CustomInstruments) + OLLAMA = ("ollama", Instruments) + OPENAI = ("openai", CustomInstruments) + OPENAI_AGENTS = ("openai_agents", Instruments) + PIKA = ("pika", CustomInstruments) + PINECONE = ("pinecone", Instruments) + PSYCOPG = ("psycopg", CustomInstruments) + PSYCOPG2 = ("psycopg2", CustomInstruments) + PYDANTIC_AI = ("pydantic_ai", CustomInstruments) + PYMEMCACHE = ("pymemcache", CustomInstruments) + PYMONGO = ("pymongo", CustomInstruments) + PYMSSQL = ("pymssql", CustomInstruments) + PYMYSQL = ("pymysql", CustomInstruments) + PYRAMID = ("pyramid", CustomInstruments) + QDRANTDB = ("qdrant_db", CustomInstruments) + REDIS = ("redis", CustomInstruments) + REMOULADE = ("remoulade", CustomInstruments) + REPLICATE = ("replicate", Instruments) + REQUESTS = ("requests", CustomInstruments) + SAGEMAKER = ("sagemaker", Instruments) + SQLALCHEMY = ("sqlalchemy", CustomInstruments) + SQLITE3 = ("sqlite3", CustomInstruments) + STARLETTE = ("starlette", CustomInstruments) + SYSTEM_METRICS = ("system_metrics", CustomInstruments) + THREADING = ("threading", CustomInstruments) + TOGETHER = ("together", Instruments) + TORNADO = ("tornado", CustomInstruments) + TORTOISEORM = ("tortoiseorm", CustomInstruments) + TRANSFORMERS = ("transformers", Instruments) + URLLIB = ("urllib", CustomInstruments) + URLLIB3 = ("urllib3", CustomInstruments) + VERTEXAI = ("vertexai", Instruments) + WATSONX = ("watsonx", Instruments) + WEAVIATEDB = ("weaviate_db", CustomInstruments) + WRITER = ("writer", Instruments) + WSGI = ("wsgi", CustomInstruments) -merged_members = {} - -for member in Instruments: - merged_members[member.name] = (member.value, Instruments) - -for member in CustomInstruments: - merged_members[member.name] = (member.value, CustomInstruments) -InstrumentSet = NetraInstruments("InstrumentSet", merged_members) +NetraInstruments = InstrumentSet # Curated default instrument set used for root_instruments when the user does # not pass an explicit value. Covers core LLM/AI providers and frameworks. DEFAULT_INSTRUMENTS_FOR_ROOT = { - InstrumentSet.ANTHROPIC, # type:ignore[attr-defined] - InstrumentSet.CARTESIA, # type:ignore[attr-defined] - InstrumentSet.COHEREAI, # type:ignore[attr-defined] - InstrumentSet.CREWAI, # type:ignore[attr-defined] - InstrumentSet.DEEPGRAM, # type:ignore[attr-defined] - InstrumentSet.ELEVENLABS, # type:ignore[attr-defined] - InstrumentSet.GOOGLE_GENERATIVEAI, # type:ignore[attr-defined] - InstrumentSet.ADK, # type:ignore[attr-defined] - InstrumentSet.GROQ, # type:ignore[attr-defined] - InstrumentSet.LANGCHAIN, # type:ignore[attr-defined] - InstrumentSet.LITELLM, # type:ignore[attr-defined] - InstrumentSet.CEREBRAS, # type:ignore[attr-defined] - InstrumentSet.MISTRALAI, # type:ignore[attr-defined] - InstrumentSet.OPENAI, # type:ignore[attr-defined] - InstrumentSet.OLLAMA, # type:ignore[attr-defined] - InstrumentSet.VERTEXAI, # type:ignore[attr-defined] - InstrumentSet.LLAMA_INDEX, # type:ignore[attr-defined] - InstrumentSet.PYDANTIC_AI, # type:ignore[attr-defined] - InstrumentSet.DSPY, # type:ignore[attr-defined] - InstrumentSet.HAYSTACK, # type:ignore[attr-defined] - InstrumentSet.BEDROCK, # type:ignore[attr-defined] - InstrumentSet.TOGETHER, # type:ignore[attr-defined] - InstrumentSet.REPLICATE, # type:ignore[attr-defined] - InstrumentSet.ALEPHALPHA, # type:ignore[attr-defined] - InstrumentSet.WATSONX, # type:ignore[attr-defined] + InstrumentSet.ANTHROPIC, + InstrumentSet.CARTESIA, + InstrumentSet.COHEREAI, + InstrumentSet.CREWAI, + InstrumentSet.DEEPGRAM, + InstrumentSet.ELEVENLABS, + InstrumentSet.GOOGLE_GENERATIVEAI, + InstrumentSet.ADK, + InstrumentSet.GROQ, + InstrumentSet.LANGCHAIN, + InstrumentSet.LITELLM, + InstrumentSet.CEREBRAS, + InstrumentSet.MISTRALAI, + InstrumentSet.OPENAI, + InstrumentSet.OLLAMA, + InstrumentSet.VERTEXAI, + InstrumentSet.LLAMA_INDEX, + InstrumentSet.PYDANTIC_AI, + InstrumentSet.DSPY, + InstrumentSet.HAYSTACK, + InstrumentSet.BEDROCK, + InstrumentSet.TOGETHER, + InstrumentSet.REPLICATE, + InstrumentSet.ALEPHALPHA, + InstrumentSet.WATSONX, } # Broader default instrument set used for the ``instruments`` parameter when @@ -129,17 +210,17 @@ def __new__(cls: Any, value: Any, origin: Any = None) -> Any: # common vector DBs, HTTP client/server, and database ORM/client libraries. DEFAULT_INSTRUMENTS = DEFAULT_INSTRUMENTS_FOR_ROOT.union( { - InstrumentSet.PINECONE, # type:ignore[attr-defined] - InstrumentSet.CHROMA, # type:ignore[attr-defined] - InstrumentSet.WEAVIATEDB, # type:ignore[attr-defined] - InstrumentSet.QDRANTDB, # type:ignore[attr-defined] - InstrumentSet.MILVUS, # type:ignore[attr-defined] - InstrumentSet.LANCEDB, # type:ignore[attr-defined] - InstrumentSet.MARQO, # type:ignore[attr-defined] - InstrumentSet.PYMYSQL, # type:ignore[attr-defined] - InstrumentSet.REQUESTS, # type:ignore[attr-defined] - InstrumentSet.SQLALCHEMY, # type:ignore[attr-defined] - InstrumentSet.HTTPX, # type:ignore[attr-defined] + InstrumentSet.PINECONE, + InstrumentSet.CHROMA, + InstrumentSet.WEAVIATEDB, + InstrumentSet.QDRANTDB, + InstrumentSet.MILVUS, + InstrumentSet.LANCEDB, + InstrumentSet.MARQO, + InstrumentSet.PYMYSQL, + InstrumentSet.REQUESTS, + InstrumentSet.SQLALCHEMY, + InstrumentSet.HTTPX, } ) diff --git a/netra/processors/instrumentation_span_processor.py b/netra/processors/instrumentation_span_processor.py index 6b85d1f..6444069 100644 --- a/netra/processors/instrumentation_span_processor.py +++ b/netra/processors/instrumentation_span_processor.py @@ -62,7 +62,7 @@ def _get_blocked_url_patterns() -> frozenset[str]: # Pre-computed allowed instrumentation names -_ALLOWED_INSTRUMENTATION_NAMES: Set[str] = {member.value for member in InstrumentSet} # type: ignore[attr-defined] +_ALLOWED_INSTRUMENTATION_NAMES: Set[str] = {member.value for member in InstrumentSet} class InstrumentationSpanProcessor(SpanProcessor): # type: ignore[misc] diff --git a/netra/processors/root_instrument_filter_processor.py b/netra/processors/root_instrument_filter_processor.py index d506381..9b564e4 100644 --- a/netra/processors/root_instrument_filter_processor.py +++ b/netra/processors/root_instrument_filter_processor.py @@ -1,6 +1,6 @@ import logging import threading -from typing import Any, Optional, Set +from typing import Optional, Set, cast from opentelemetry import context as otel_context from opentelemetry import trace @@ -130,7 +130,7 @@ def _process_span_start( @staticmethod def _resolve_parent_span_id( parent_context: Optional[otel_context.Context], - ) -> Any: + ) -> Optional[int]: """ Return the parent span's ``span_id`` from the supplied context, or ``None``. @@ -148,7 +148,7 @@ def _resolve_parent_span_id( sc = parent_span.get_span_context() if sc is None: return None - return sc.span_id + return cast(Optional[int], sc.span_id) @staticmethod def _get_span_id(span: object) -> Optional[int]: @@ -164,7 +164,7 @@ def _get_span_id(span: object) -> Optional[int]: ctx = getattr(span, "context", None) or getattr(span, "get_span_context", lambda: None)() if ctx is None: return None - return getattr(ctx, "span_id", None) + return cast(Optional[int], getattr(ctx, "span_id", None)) @staticmethod def _mark_blocked(span: Span) -> None: